[hornetq-commits] JBoss hornetq SVN: r9591 - in branches/Branch_2_1: src/main/org/hornetq/core/journal and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 24 10:21:25 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-24 10:21:23 -0400 (Tue, 24 Aug 2010)
New Revision: 9591
Added:
branches/Branch_2_1/merge-activity.txt
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
merge journal fixes from trunk
Added: branches/Branch_2_1/merge-activity.txt
===================================================================
--- branches/Branch_2_1/merge-activity.txt (rev 0)
+++ branches/Branch_2_1/merge-activity.txt 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,9 @@
+Detailed list of merges that happened at this branch.
+
+
+- Date - author - Description
+
+- 24-aug-2010 - clebert - Branch created from https://svn.jboss.org/repos/hornetq/tags/HornetQ_2_1_2_Final/
+
+- 24-aug-2010 - clebert - merge from trunk -r9588:9590
+ There was also a manual copy of JournalImpl.java on this merge, since there was a minor change before that needed to be applied
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -23,7 +23,7 @@
*/
public class RecordInfo
{
- public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate)
+ public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount)
{
this.id = id;
@@ -32,8 +32,15 @@
this.data = data;
this.isUpdate = isUpdate;
+
+ this.compactCount = compactCount;
}
+ /** How many times this record was compacted (up to 7 times)
+ After the record has reached 7 times, it will always be 7
+ As we only store up to 0x7 binary, as part of the recordID (binary 111) */
+ public final short compactCount;
+
public final long id;
public final byte userRecordType;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -57,8 +57,6 @@
void compact() throws Exception;
- void cleanUp(final JournalFile file) throws Exception;
-
JournalFile getCurrentFile();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -218,7 +218,7 @@
sequentialFile.open(1, false);
- currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
Added: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java (rev 0)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/CompactJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.Base64;
+
+/**
+ * This is an undocumented class, that will open a journal and force compacting on it.
+ * It may be used under special cases, but it shouldn't be needed under regular circunstances as the system should detect
+ * the need for compacting.
+ *
+ * The regular use is to configure min-compact parameters.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactJournal
+{
+
+ public static void main(String arg[])
+ {
+ if (arg.length != 4)
+ {
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.CompactJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>");
+ return;
+ }
+
+ try
+ {
+ compactJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void compactJournal(String directory,
+ String journalPrefix,
+ String journalSuffix,
+ int minFiles,
+ int fileSize) throws Exception
+ {
+ NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
+
+ JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.compact();
+
+ journal.stop();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -183,6 +183,8 @@
recordInfo.data.length +
",isUpdate@" +
recordInfo.isUpdate +
+ ",compactCount@" +
+ recordInfo.compactCount +
",data@" +
encode(recordInfo.data);
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -279,11 +279,11 @@
protected static RecordInfo parseRecord(Properties properties) throws Exception
{
- int id = parseInt("id", properties);
+ long id = parseLong("id", properties);
byte userRecordType = parseByte("userRecordType", properties);
boolean isUpdate = parseBoolean("isUpdate", properties);
byte[] data = parseEncoding("data", properties);
- return new RecordInfo(id, userRecordType, data, isUpdate);
+ return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
private static byte[] parseEncoding(String name, Properties properties) throws Exception
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
-
-/**
- * A JournalCleaner
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class JournalCleaner extends AbstractJournalUpdateTask
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
- /**
- * @param fileFactory
- * @param journal
- * @param nextOrderingID
- */
- protected JournalCleaner(final SequentialFileFactory fileFactory,
- final JournalImpl journal,
- final Set<Long> recordsSnapshot,
- final long nextOrderingID) throws Exception
- {
- super(fileFactory, journal, recordsSnapshot, nextOrderingID);
- openFile();
- }
-
- // Public --------------------------------------------------------
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
- */
- public void markAsDataFile(final JournalFile file)
- {
- // nothing to be done here
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecord(final RecordInfo info) throws Exception
- {
- if (lookupRecord(info.id))
- {
- writeEncoder(new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(true,
- transactionID,
- recordInfo.id,
- recordInfo.getUserRecordType(),
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
- */
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
- */
- public void onReadDeleteRecord(final long recordID) throws Exception
- {
- writeEncoder(new JournalDeleteRecord(recordID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalDeleteRecordTX(transactionID, recordInfo.id, new ByteArrayEncoding(recordInfo.data)));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
- */
- public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(false, transactionID, new ByteArrayEncoding(extraData)), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
- */
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- writeEncoder(new JournalRollbackRecordTX(transactionID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- writeEncoder(new JournalAddRecord(false,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(false,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected int incrementTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- counter = new AtomicInteger(0);
- transactionCounter.put(transactionID, counter);
- }
-
- return counter.incrementAndGet();
- }
-
- protected int getTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- return 0;
- }
- else
- {
- return counter.intValue();
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -47,6 +47,11 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
+
+ // We try to separate old record from new ones when doing the compacting
+ // this is a split line
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
@@ -69,7 +74,7 @@
if (controlFile.exists())
{
- JournalFile file = new JournalFileImpl(controlFile, 0);
+ JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -209,19 +214,65 @@
private void checkSize(final int size) throws Exception
{
+ checkSize(size, -1);
+ }
+
+ private void checkSize(final int size, final int compactCount) throws Exception
+ {
if (getWritingChannel() == null)
{
- openFile();
+ if (!checkCompact(compactCount))
+ {
+ // will need to open a file either way
+ openFile();
+ }
}
else
{
+ if (compactCount >= 0)
+ {
+ if (checkCompact(compactCount))
+ {
+ // The file was already moved on this case, no need to check for the size.
+ // otherwise we will also need to check for the size
+ return;
+ }
+ }
+
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
+
+ int currentCount;
+ // This means we will need to split when the compactCount is bellow the watermark
+ boolean willNeedToSplit = false;
+ boolean splitted = false;
+ private boolean checkCompact(final int compactCount) throws Exception
+ {
+ if (compactCount >= COMPACT_SPLIT_LINE && !splitted)
+ {
+ willNeedToSplit = true;
+ }
+
+ if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
+ {
+ willNeedToSplit = false;
+ splitted = false;
+ openFile();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+
/**
* Replay pending counts that happened during compacting
*/
@@ -252,9 +303,10 @@
info.id,
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
+ addRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(addRecord.getEncodeSize(), info.compactCount);
- checkSize(addRecord.getEncodeSize());
-
writeEncoder(addRecord);
newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
@@ -273,7 +325,9 @@
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
- checkSize(record.getEncodeSize());
+ record.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -395,7 +449,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecord.getEncodeSize());
+ updateRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -425,7 +481,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecordTX.getEncodeSize());
+ updateRecordTX.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,10 +53,6 @@
boolean isCanReclaim();
- void setNeedCleanup(boolean needCleanup);
-
- boolean isNeedCleanup();
-
long getOffset();
/** This is a field to identify that records on this file actually belong to the current file.
@@ -64,6 +60,8 @@
int getRecordID();
long getFileID();
+
+ int getJournalVersion();
SequentialFile getFile();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -36,7 +36,7 @@
private final SequentialFile file;
private final long fileID;
-
+
private final int recordID;
private long offset;
@@ -47,19 +47,20 @@
private boolean canReclaim;
- private boolean needCleanup;
-
private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
-
+ private final int version;
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final long fileID)
+ public JournalFileImpl(final SequentialFile file, final long fileID, final int version)
{
this.file = file;
this.fileID = fileID;
-
+
+ this.version = version;
+
this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
}
@@ -81,16 +82,6 @@
return canReclaim;
}
- public boolean isNeedCleanup()
- {
- return needCleanup;
- }
-
- public void setNeedCleanup(final boolean needCleanup)
- {
- this.needCleanup = needCleanup;
- }
-
public void setCanReclaim(final boolean canReclaim)
{
this.canReclaim = canReclaim;
@@ -119,6 +110,11 @@
}
}
+ public int getJournalVersion()
+ {
+ return version;
+ }
+
public boolean resetNegCount(final JournalFile file)
{
return negCounts.remove(file) != null;
@@ -148,7 +144,7 @@
{
return fileID;
}
-
+
public int getRecordID()
{
return recordID;
@@ -227,12 +223,10 @@
{
return liveBytes.get();
}
-
+
public int getTotalNegativeToOthers()
{
return totalNegativeToOthers.get();
}
-
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -67,6 +68,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
+
/**
*
* <p>A circular log implementation.</p
@@ -88,7 +90,9 @@
private static final int STATE_LOADED = 2;
- private static final int FORMAT_VERSION = 1;
+ public static final int FORMAT_VERSION = 2;
+
+ private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
// Static --------------------------------------------------------
@@ -183,9 +187,9 @@
public final String fileExtension;
- private final LinkedBlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
- private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
+ private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
@@ -398,9 +402,10 @@
try
{
- long fileID = readFileHeader(file);
+
+ JournalFileImpl jrnFile = readFileHeader(file);
- orderedFiles.add(new JournalFileImpl(file, fileID));
+ orderedFiles.add(jrnFile);
}
finally
{
@@ -499,6 +504,21 @@
continue;
}
+ short compactCount = 0;
+
+ if (file.getJournalVersion() >= 2)
+ {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
+ {
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ compactCount = wholeFileBuffer.get();
+ }
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -602,7 +622,7 @@
variableSize = 0;
}
- int recordSize = JournalImpl.getRecordSize(recordType);
+ int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
// VI - this is completing V, We will validate the size at the end
// of the record,
@@ -676,13 +696,13 @@
{
case ADD_RECORD:
{
- reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD:
{
- reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
@@ -694,19 +714,19 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
break;
}
@@ -1647,6 +1667,7 @@
else
{
log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ System.exit(-1);
}
}
}
@@ -1693,6 +1714,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
* <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
@@ -1708,6 +1730,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>ExtraDataLength (Prepares only)</td><td>Integer (4 bytes)</td></tr>
* <tr><td>Number Of Files (N)</td><td>Integer (4 bytes)</td></tr>
@@ -1776,7 +1799,7 @@
loadManager.addRecord(info);
- records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD));
+ records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -1795,7 +1818,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD);
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
}
}
@@ -1845,7 +1868,7 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX);
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2135,7 +2158,6 @@
if (file.isCanReclaim())
{
// File can be reclaimed or deleted
-
if (JournalImpl.trace)
{
JournalImpl.trace("Reclaiming file " + file);
@@ -2149,67 +2171,6 @@
addFreeFile(file, false);
}
}
-
- int nCleanup = 0;
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- nCleanup++;
- }
- }
-
- if (compactMinFiles > 0)
- {
- if (nCleanup > 0 && needsCompact())
- {
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- final JournalFile cleanupFile = file;
-
- if (compactorRunning.compareAndSet(false, true))
- {
- // The cleanup should happen rarely.
- // but when it happens it needs to use a different thread,
- // or opening new files or any other executor's usage will be blocked while the cleanUp is being
- // processed.
-
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- cleanUp(cleanupFile);
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- if (autoReclaim)
- {
- scheduleReclaim();
- }
- }
- }
- });
- }
- return true;
- }
- else
- {
- // We only cleanup the first files
- // if a middle file needs cleanup it will be done through compacting
- break;
- }
- }
- }
- }
}
finally
{
@@ -2219,132 +2180,9 @@
return false;
}
- // This method is public for tests
- public synchronized void cleanUp(final JournalFile file) throws Exception
- {
- if (state != JournalImpl.STATE_LOADED)
- {
- return;
- }
+
+ int deleteme = 0;
- try
- {
- JournalCleaner cleaner = null;
- ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
- compactingLock.writeLock().lock();
-
- try
- {
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("Cleaning up file " + file);
- }
- JournalImpl.log.debug("Cleaning up file " + file);
-
- if (file.getPosCount() == 0)
- {
- // nothing to be done
- return;
- }
-
- // We don't want this file to be reclaimed during the cleanup
- file.incPosCount();
-
- // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
- // could be reset
- for (JournalFile jrnFile : dataFiles)
- {
- if (jrnFile.resetNegCount(file))
- {
- dependencies.add(jrnFile);
- jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
- }
- }
-
- currentFile.resetNegCount(file);
- currentFile.incPosCount();
- dependencies.add(currentFile);
-
- cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
- }
- finally
- {
- compactingLock.writeLock().unlock();
- }
-
- compactingLock.readLock().lock();
-
- try
- {
- JournalImpl.readJournalFile(fileFactory, file, cleaner);
- }
- catch (Throwable e)
- {
- log.warn("Error reading cleanup on " + file, e);
- throw new Exception("Error reading cleanup on " + file, e);
- }
-
- cleaner.flush();
-
- // pointcut for tests
- // We need to test concurrent updates on the journal, as the compacting is being performed.
- // Usually tests will use this to hold the compacting while other structures are being updated.
- onCompactDone();
-
- for (JournalFile jrnfile : dependencies)
- {
- jrnfile.decPosCount();
- }
- file.decPosCount();
-
- SequentialFile tmpFile = cleaner.currentFile.getFile();
- String tmpFileName = tmpFile.getFileName();
- String cleanedFileName = file.getFile().getFileName();
-
- SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
- cleanedFileName));
-
- SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
-
- returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
-
- tmpFile.renameTo(cleanedFileName);
-
- controlFile.delete();
-
- final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
- if (trace)
- {
- trace("Adding free file back from cleanup" + retJournalfile);
- }
-
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- addFreeFile(retJournalfile, true);
- }
- catch (Throwable e)
- {
- log.warn("Error reinitializing file " + file, e);
- }
-
- }
- });
-
- }
- finally
- {
- compactingLock.readLock().unlock();
- JournalImpl.log.debug("Clean up on file " + file + " done");
- }
-
- }
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2359,8 +2197,10 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
+
+ boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
- return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ return needCompact;
}
@@ -2840,7 +2680,7 @@
int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- JournalFile jf = new JournalFileImpl(sf, newFileID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
sf.position(position);
@@ -2886,7 +2726,7 @@
return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
}
- private static int getRecordSize(final byte recordType)
+ private static int getRecordSize(final byte recordType, final int journalVersion)
{
// The record size (without the variable portion)
int recordSize = 0;
@@ -2925,7 +2765,14 @@
throw new IllegalStateException("Record other than expected");
}
- return recordSize;
+ if (journalVersion >= 2)
+ {
+ return recordSize + 1;
+ }
+ else
+ {
+ return recordSize;
+ }
}
/**
@@ -2933,17 +2780,30 @@
* @return
* @throws Exception
*/
- private long readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
+
if (journalVersion != FORMAT_VERSION)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
+ boolean isCompatible = false;
+
+ for (int v : COMPATIBLE_VERSIONS)
+ {
+ if (v == journalVersion)
+ {
+ isCompatible = true;
+ }
+ }
+
+ if (!isCompatible)
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ }
}
int readUserVersion = bb.getInt();
@@ -2958,7 +2818,8 @@
fileFactory.releaseBuffer(bb);
bb = null;
- return fileID;
+
+ return new JournalFileImpl(file, fileID, journalVersion);
}
/**
@@ -3173,7 +3034,7 @@
sequentialFile.position(position);
}
- return new JournalFileImpl(sequentialFile, fileID);
+ return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
}
/**
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -53,8 +53,6 @@
JournalFile currentFile = files[i];
- currentFile.setNeedCleanup(false);
-
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -101,18 +99,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
- file.setNeedCleanup(true);
-
- if (file.getTotalNegativeToOthers() == 0)
- {
- file.setNeedCleanup(true);
- }
- else
- {
- // This file can't be cleared as the file has negatives to other files as well
- file.setNeedCleanup(false);
- }
-
+
currentFile.setCanReclaim(false);
break;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -66,6 +66,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -81,6 +83,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
}
}
\ No newline at end of file
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -75,6 +75,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -92,6 +94,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -70,6 +70,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -105,11 +107,11 @@
{
if (isCommit)
{
- return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+ return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1;
}
else
{
- return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+ return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1;
}
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -44,6 +44,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -53,6 +55,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD;
+ return JournalImpl.SIZE_DELETE_RECORD + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -55,6 +55,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -73,6 +75,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1;
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -27,6 +27,8 @@
{
protected int fileID;
+
+ protected byte compactCount;
public int getFileID()
{
@@ -50,6 +52,23 @@
{
return 0;
}
+
+ public short getCompactCount()
+ {
+ return compactCount;
+ }
+
+ public void setCompactCount(final short compactCount)
+ {
+ if (compactCount > Byte.MAX_VALUE)
+ {
+ this.compactCount = Byte.MAX_VALUE;
+ }
+ else
+ {
+ this.compactCount = (byte)compactCount;
+ }
+ }
public abstract int getEncodeSize();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,14 +39,15 @@
{
buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
buffer.writeInt(fileID);
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
- buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+ buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1);
}
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ROLLBACK_RECORD;
+ return JournalImpl.SIZE_ROLLBACK_RECORD + 1;
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -39,7 +39,6 @@
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.ReusableLatch;
/**
*
@@ -66,7 +65,7 @@
for (int i = 0; i < 5; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
- dataFiles.add(new JournalFileImpl(file, 0));
+ dataFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
@@ -74,7 +73,7 @@
for (int i = 0; i < 3; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
- newFiles.add(new JournalFileImpl(file, 0));
+ newFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
@@ -806,33 +805,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
+ createJournal();
+
startJournal();
load();
@@ -844,26 +818,8 @@
long addedRecord = idGen.generateID();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(consumerTX, firstID);
addTx(appendTX, addedRecord);
@@ -876,10 +832,8 @@
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
long newRecord = idGen.generateID();
@@ -959,52 +913,11 @@
setup(2, 60 * 1024, false);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.cleanUp(journal.getDataFiles()[0]);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
for (int i = 0; i < 100; i++)
{
add(i);
@@ -1017,20 +930,16 @@
delete(i);
}
- tCompact.start();
-
- reusableLatchDone.await();
-
+ startCompact();
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
delete(i);
}
- reusableLatchWait.countDown();
-
- tCompact.join();
-
+ finishCompact();
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -1054,53 +963,12 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ createJournal();
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
long appendTX = idGen.generateID();
long appendOne = idGen.generateID();
long appendTwo = idGen.generateID();
@@ -1109,9 +977,8 @@
addTx(appendTX, appendOne);
- tCompact.start();
- reusableLatchDone.await();
-
+ startCompact();
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1122,8 +989,7 @@
commit(updateTX);
// delete(appendTwo);
- reusableLatchWait.countDown();
- tCompact.join();
+ finishCompact();
journal.compact();
@@ -1239,13 +1105,13 @@
long id = idGenerator.generateID();
listToDelete.add(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
add(id);
journal.forceMoveNextFile();
update(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
journal.forceMoveNextFile();
}
@@ -1295,7 +1161,55 @@
}
}
+
+ public void testCompactFirstFileWithPendingCommits() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx);
+
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
public void testLiveSizeTransactional() throws Exception
{
setup(2, 60 * 1024, true);
Added: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java (rev 0)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A OldFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OldFormatTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // This will generate records using the Version 1 format, and reading at the current version
+ public void testFormatOne() throws Exception
+ {
+ setup(2, 100 * 1024, true);
+
+ SequentialFile file = fileFactory.createSequentialFile("hq-1.hq", 1);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(100 * 1024);
+
+ initHeader(buffer, 1);
+
+ byte[] record = new byte[1];
+
+ for (long i = 0 ; i < 10; i++)
+ {
+ add(buffer, 1, i, record);
+
+ update(buffer, 1, i, record);
+ }
+
+ file.open(1, false);
+
+ buffer.rewind();
+
+ file.writeDirect(buffer, true);
+
+ file.close();
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ startCompact();
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ private void add(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.ADD_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, false, (short)0));
+ }
+
+ private void update(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.UPDATE_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, true, (short)0));
+
+ }
+
+ /**
+ * @param buffer
+ */
+ private void initHeader(ByteBuffer buffer, int fileID)
+ {
+ buffer.putInt(1);
+
+ buffer.putInt(0);
+
+ buffer.putLong(fileID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -15,15 +15,19 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -41,7 +45,6 @@
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
* A SoakJournal
@@ -56,6 +59,8 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
private static final int MAX_WRITES = 20000;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -82,6 +87,12 @@
Executor testExecutor;
+ protected long getTotalTimeMilliseconds()
+ {
+ return TimeUnit.MINUTES.toMillis(2);
+ }
+
+
@Override
public void setUp() throws Exception
{
@@ -114,7 +125,7 @@
journal = new JournalImpl(50 * 1024,
20,
- 15,
+ 50,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
"hornetq-data",
@@ -181,11 +192,6 @@
threadPool.shutdown();
}
- protected long getTotalTimeMilliseconds()
- {
- return TimeUnit.MINUTES.toMillis(10);
- }
-
public void testAppend() throws Exception
{
@@ -225,6 +231,12 @@
", liveRecords = " +
(numberOfRecords.get() - numberOfDeletes.get()));
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ rwLock.writeLock().lock();
+ System.out.println("Restarting server");
+ journal.stop();
+ journal.start();
+ reloadJournal();
+ rwLock.writeLock().unlock();
}
running = false;
@@ -255,12 +267,36 @@
latchExecutorDone.await();
- assertEquals(0, errors.get());
-
journal.stop();
journal.start();
+ reloadJournal();
+
+ Collection<Long> records = journal.getRecords().keySet();
+
+ System.out.println("Deleting everything!");
+ for (Long delInfo : records)
+ {
+ journal.appendDeleteRecord(delInfo, false);
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread.sleep(5000);
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ journal.stop();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void reloadJournal() throws Exception
+ {
+ assertEquals(0, errors.get());
+
ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
@@ -285,8 +321,6 @@
}
assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
-
- journal.stop();
}
private byte[] generateRecord()
@@ -313,9 +347,10 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
+
try
{
-
while (running)
{
final int txSize = RandomUtil.randomMax(100);
@@ -358,6 +393,14 @@
}
}
});
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+
}
}
catch (Exception e)
@@ -366,6 +409,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -384,6 +431,9 @@
@Override
public void run()
{
+
+ rwLock.readLock().lock();
+
try
{
int txSize = RandomUtil.randomMax(100);
@@ -391,17 +441,30 @@
long ids[] = new long[txSize];
long txID = JournalCleanupCompactStressTest.idGen.generateID();
-
+
while (running)
{
- long id = queue.poll(60, TimeUnit.MINUTES);
- ids[txCount] = id;
- journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- if (++txCount == txSize)
+ Long id = queue.poll(10, TimeUnit.SECONDS);
+ if (id != null)
{
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new DeleteTask(ids));
+ ids[txCount++] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ }
+ if (txCount == txSize || id == null)
+ {
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ }
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
txCount = 0;
txSize = RandomUtil.randomMax(100);
txID = JournalCleanupCompactStressTest.idGen.generateID();
@@ -420,6 +483,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -434,14 +501,18 @@
public void done()
{
+ rwLock.readLock().lock();
numberOfUpdates.addAndGet(ids.length);
try
{
for (long id : ids)
{
- journal.appendDeleteRecord(id, false);
- maxRecords.release();
- numberOfDeletes.incrementAndGet();
+ if (id != 0)
+ {
+ journal.appendDeleteRecord(id, false);
+ maxRecords.release();
+ numberOfDeletes.incrementAndGet();
+ }
}
}
catch (Exception e)
@@ -451,6 +522,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
public void onError(final int errorCode, final String errorMessage)
@@ -473,6 +548,7 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
try
{
while (running)
@@ -481,18 +557,22 @@
// Append
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("append slow");
+ System.out.println("append slow");
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
maxRecords.acquire();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
numberOfRecords.incrementAndGet();
+ rwLock.readLock().unlock();
+
Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+ rwLock.readLock().lock();
}
// Delete
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("Deleting");
+ System.out.println("Deleting");
maxRecords.release();
journal.appendDeleteRecord(ids[i], false);
numberOfDeletes.incrementAndGet();
@@ -504,6 +584,10 @@
e.printStackTrace();
System.exit(-1);
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -542,7 +542,7 @@
journalImpl.appendAddRecordTransactional(1l,
2l,
(byte)3,
- new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+ new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX - 1, (byte)4));
journalImpl.appendCommitRecord(1l, false);
@@ -587,11 +587,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
@@ -652,11 +652,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -333,7 +333,7 @@
journal.appendAddRecord(element, (byte)0, record, sync);
- records.add(new RecordInfo(element, (byte)0, record, false));
+ records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
journal.debugWait();
@@ -349,7 +349,7 @@
journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
- records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
@@ -377,13 +377,13 @@
{
// SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
// SIZE_BYTE
- byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] record = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendAddRecordTransactional(txID, element, (byte)0, record);
- tx.records.add(new RecordInfo(element, (byte)0, record, false));
+ tx.records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
@@ -396,13 +396,13 @@
for (long element : arguments)
{
- byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] updateRecord = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
- tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
}
@@ -417,7 +417,7 @@
journal.appendDeleteRecordTransactional(txID, element);
- tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
+ tx.deletes.add(new RecordInfo(element, (byte)0, null, true, (short)0));
}
journal.debugWait();
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -492,7 +492,8 @@
private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize)
{
recordSize = calculateRecordSize(recordSize, alignment);
- return fileSize / recordSize;
+ int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
+ return (fileSize - headerSize) / recordSize;
}
/**
@@ -666,7 +667,9 @@
int addRecordsPerFile = calculateRecordsPerFile(10 * 1024,
journal.getAlignment(),
- JournalImpl.SIZE_ADD_RECORD + recordLength);
+ JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
+
+ System.out.println(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
// Fills exactly 10 files
int initialNumberOfAddRecords = addRecordsPerFile * 10;
@@ -693,29 +696,11 @@
// Now delete half of them
- int deleteRecordsPerFile = calculateRecordsPerFile(10 * 1024,
- journal.getAlignment(),
- JournalImpl.SIZE_DELETE_RECORD);
-
for (int i = 0; i < initialNumberOfAddRecords / 2; i++)
{
delete(i);
}
- int numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD);
-
- if (initialNumberOfAddRecords / 2 % deleteRecordsPerFile == 0)
- {
- // The file is already full, next add would fix it
- numberOfFiles--;
- }
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2, journal.getIDMapSize());
@@ -726,16 +711,6 @@
add(initialNumberOfAddRecords + i);
}
- numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD,
- 10,
- JournalImpl.SIZE_ADD_RECORD + recordLength);
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2 + 10, journal.getIDMapSize());
@@ -786,7 +761,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
startJournal();
@@ -825,7 +800,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
@@ -1130,7 +1105,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 2
journal.debugWait();
@@ -1161,7 +1136,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4
List<String> files5 = fileFactory.listFiles(fileExtension);
@@ -1491,7 +1466,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2);
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2);
// Move on to another file
@@ -1544,7 +1519,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1565,7 +1540,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1586,9 +1561,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1596,9 +1571,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1608,9 +1583,6 @@
List<String> files5 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files5.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1619,9 +1591,6 @@
List<String> files6 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files6.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1633,11 +1602,6 @@
startJournal();
loadAndCheck();
- List<String> files7 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(4, files7.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1665,7 +1629,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1678,14 +1642,12 @@
rollback(1); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
Assert.assertEquals(calculateNumberOfFiles(fileSize,
journal.getAlignment(),
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1693,7 +1655,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1706,9 +1668,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1716,43 +1678,19 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 2
// (current
// file)
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1859,76 +1797,23 @@
EncodingSupport xid = new SimpleEncoding(10, (byte)0);
prepare(1, xid); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(3, files3.size());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
Assert.assertEquals(1, journal.getOpenedFilesCount());
delete(2); // in file 1
- List<String> files4 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 3); // in file 2
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
-
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1943,7 +1828,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 4); // in file 3
List<String> files7 = fileFactory.listFiles(fileExtension);
@@ -2415,7 +2300,7 @@
journal.appendAddRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, false));
+ records.add(new RecordInfo(i, (byte)0, record, false, (short)0));
}
for (int i = 0; i < 100; i++)
@@ -2424,7 +2309,7 @@
journal.appendUpdateRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, true));
+ records.add(new RecordInfo(i, (byte)0, record, true, (short)0));
}
for (int i = 0; i < 100; i++)
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 14:06:49 UTC (rev 9590)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 14:21:23 UTC (rev 9591)
@@ -22,6 +22,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.Reclaimer;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -357,23 +358,6 @@
assertCantDelete(2);
}
- public void testCleanup() throws Exception
- {
- setup(3);
- setupPosNeg(0, 11, 0, 0, 0);
- setupPosNeg(1, 1, 10, 0, 0);
- setupPosNeg(2, 1, 0, 1, 0);
-
- reclaimer.scan(files);
-
- debugFiles();
-
- assertCantDelete(0);
- Assert.assertTrue(files[0].isNeedCleanup());
- assertCantDelete(1);
- assertCantDelete(2);
- }
-
public void testThreeFiles10() throws Exception
{
setup(3);
@@ -741,9 +725,7 @@
"]=" +
files[i].getPosCount() +
", canDelete = " +
- files[i].isCanReclaim() +
- ", cleanup = " +
- files[i].isNeedCleanup());
+ files[i].isCanReclaim());
for (int j = 0; j <= i; j++)
{
System.out.println("..." + files[i].getNegCount(files[j]));
@@ -1002,5 +984,31 @@
{
return totalDep;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getJournalVersion()
+ */
+ public int getJournalVersion()
+ {
+ return JournalImpl.FORMAT_VERSION;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getTotNeg()
+ */
+ public int getTotNeg()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#setTotNeg(int)
+ */
+ public void setTotNeg(int totNeg)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
More information about the hornetq-commits
mailing list