JBoss hornetq SVN: r9590 - trunk/tests/src/org/hornetq/tests/soak/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-24 10:06:49 -0400 (Tue, 24 Aug 2010)
New Revision: 9590
Modified:
trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
Log:
commit done by accident, reverting timeout
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-08-24 12:52:32 UTC (rev 9589)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-08-24 14:06:49 UTC (rev 9590)
@@ -39,7 +39,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.HOURS.toMillis(8);
+ return TimeUnit.HOURS.toMillis(2);
}
14 years, 5 months
JBoss hornetq SVN: r9589 - branches.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-24 08:52:32 -0400 (Tue, 24 Aug 2010)
New Revision: 9589
Added:
branches/Branch_2_1/
Log:
2.1 Branch being done off 2.1.2
Copied: branches/Branch_2_1 (from rev 9588, tags/HornetQ_2_1_2_Final)
14 years, 5 months
JBoss hornetq SVN: r9588 - in trunk: src/main/org/hornetq/core/journal/impl and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-24 08:23:22 -0400 (Tue, 24 Aug 2010)
New Revision: 9588
Added:
trunk/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
Removed:
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/ant/
Modified:
trunk/src/main/org/hornetq/core/journal/RecordInfo.java
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-493 - Cleanup is being removed and compact being improved
Modified: trunk/src/main/org/hornetq/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/RecordInfo.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -23,7 +23,7 @@
*/
public class RecordInfo
{
- public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate)
+ public RecordInfo(final long id, final byte userRecordType, final byte[] data, final boolean isUpdate, final short compactCount)
{
this.id = id;
@@ -32,8 +32,15 @@
this.data = data;
this.isUpdate = isUpdate;
+
+ this.compactCount = compactCount;
}
+ /** How many times this record was compacted (up to 7 times)
+ After the record has reached 7 times, it will always be 7
+ As we only store up to 0x7 binary, as part of the recordID (binary 111) */
+ public final short compactCount;
+
public final long id;
public final byte userRecordType;
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -57,8 +57,6 @@
void compact() throws Exception;
- void cleanUp(final JournalFile file) throws Exception;
-
JournalFile getCurrentFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -218,7 +218,7 @@
sequentialFile.open(1, false);
- currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -183,6 +183,8 @@
recordInfo.data.length +
",isUpdate@" +
recordInfo.isUpdate +
+ ",compactCount@" +
+ recordInfo.compactCount +
",data@" +
encode(recordInfo.data);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -283,7 +283,7 @@
byte userRecordType = parseByte("userRecordType", properties);
boolean isUpdate = parseBoolean("isUpdate", properties);
byte[] data = parseEncoding("data", properties);
- return new RecordInfo(id, userRecordType, data, isUpdate);
+ return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
private static byte[] parseEncoding(String name, Properties properties) throws Exception
Deleted: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
-import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
-import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
-
-/**
- * A JournalCleaner
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class JournalCleaner extends AbstractJournalUpdateTask
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
- /**
- * @param fileFactory
- * @param journal
- * @param nextOrderingID
- */
- protected JournalCleaner(final SequentialFileFactory fileFactory,
- final JournalImpl journal,
- final Set<Long> recordsSnapshot,
- final long nextOrderingID) throws Exception
- {
- super(fileFactory, journal, recordsSnapshot, nextOrderingID);
- openFile();
- }
-
- // Public --------------------------------------------------------
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
- */
- public void markAsDataFile(final JournalFile file)
- {
- // nothing to be done here
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecord(final RecordInfo info) throws Exception
- {
- if (lookupRecord(info.id))
- {
- writeEncoder(new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(true,
- transactionID,
- recordInfo.id,
- recordInfo.getUserRecordType(),
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
- */
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
- */
- public void onReadDeleteRecord(final long recordID) throws Exception
- {
- writeEncoder(new JournalDeleteRecord(recordID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalDeleteRecordTX(transactionID, recordInfo.id, new ByteArrayEncoding(recordInfo.data)));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
- */
- public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
- {
- int txcounter = getTransactionCounter(transactionID);
-
- writeEncoder(new JournalCompleteRecordTX(false, transactionID, new ByteArrayEncoding(extraData)), txcounter);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
- */
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- writeEncoder(new JournalRollbackRecordTX(transactionID));
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- writeEncoder(new JournalAddRecord(false,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
- */
- public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- if (lookupRecord(recordInfo.id))
- {
- incrementTransactionCounter(transactionID);
-
- writeEncoder(new JournalAddRecordTX(false,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data)));
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected int incrementTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- counter = new AtomicInteger(0);
- transactionCounter.put(transactionID, counter);
- }
-
- return counter.incrementAndGet();
- }
-
- protected int getTransactionCounter(final long transactionID)
- {
- AtomicInteger counter = transactionCounter.get(transactionID);
- if (counter == null)
- {
- return 0;
- }
- else
- {
- return counter.intValue();
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -47,6 +47,11 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
+
+ // We try to separate old record from new ones when doing the compacting
+ // this is a split line
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
@@ -69,7 +74,7 @@
if (controlFile.exists())
{
- JournalFile file = new JournalFileImpl(controlFile, 0);
+ JournalFile file = new JournalFileImpl(controlFile, 0, JournalImpl.FORMAT_VERSION);
final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -209,19 +214,65 @@
private void checkSize(final int size) throws Exception
{
+ checkSize(size, -1);
+ }
+
+ private void checkSize(final int size, final int compactCount) throws Exception
+ {
if (getWritingChannel() == null)
{
- openFile();
+ if (!checkCompact(compactCount))
+ {
+ // will need to open a file either way
+ openFile();
+ }
}
else
{
+ if (compactCount >= 0)
+ {
+ if (checkCompact(compactCount))
+ {
+ // The file was already moved on this case, no need to check for the size.
+ // otherwise we will also need to check for the size
+ return;
+ }
+ }
+
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
+
+ int currentCount;
+ // This means we will need to split when the compactCount is bellow the watermark
+ boolean willNeedToSplit = false;
+ boolean splitted = false;
+ private boolean checkCompact(final int compactCount) throws Exception
+ {
+ if (compactCount >= COMPACT_SPLIT_LINE && !splitted)
+ {
+ willNeedToSplit = true;
+ }
+
+ if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
+ {
+ willNeedToSplit = false;
+ splitted = false;
+ openFile();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+
/**
* Replay pending counts that happened during compacting
*/
@@ -252,9 +303,10 @@
info.id,
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
+ addRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(addRecord.getEncodeSize(), info.compactCount);
- checkSize(addRecord.getEncodeSize());
-
writeEncoder(addRecord);
newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
@@ -273,7 +325,9 @@
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
- checkSize(record.getEncodeSize());
+ record.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -395,7 +449,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecord.getEncodeSize());
+ updateRecord.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -425,7 +481,9 @@
info.userRecordType,
new ByteArrayEncoding(info.data));
- checkSize(updateRecordTX.getEncodeSize());
+ updateRecordTX.setCompactCount((short)(info.compactCount + 1));
+
+ checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -53,10 +53,6 @@
boolean isCanReclaim();
- void setNeedCleanup(boolean needCleanup);
-
- boolean isNeedCleanup();
-
long getOffset();
/** This is a field to identify that records on this file actually belong to the current file.
@@ -64,6 +60,8 @@
int getRecordID();
long getFileID();
+
+ int getJournalVersion();
SequentialFile getFile();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -36,7 +36,7 @@
private final SequentialFile file;
private final long fileID;
-
+
private final int recordID;
private long offset;
@@ -47,19 +47,20 @@
private boolean canReclaim;
- private boolean needCleanup;
-
private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
-
+ private final int version;
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final long fileID)
+ public JournalFileImpl(final SequentialFile file, final long fileID, final int version)
{
this.file = file;
this.fileID = fileID;
-
+
+ this.version = version;
+
this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
}
@@ -81,16 +82,6 @@
return canReclaim;
}
- public boolean isNeedCleanup()
- {
- return needCleanup;
- }
-
- public void setNeedCleanup(final boolean needCleanup)
- {
- this.needCleanup = needCleanup;
- }
-
public void setCanReclaim(final boolean canReclaim)
{
this.canReclaim = canReclaim;
@@ -119,6 +110,11 @@
}
}
+ public int getJournalVersion()
+ {
+ return version;
+ }
+
public boolean resetNegCount(final JournalFile file)
{
return negCounts.remove(file) != null;
@@ -148,7 +144,7 @@
{
return fileID;
}
-
+
public int getRecordID()
{
return recordID;
@@ -227,12 +223,10 @@
{
return liveBytes.get();
}
-
+
public int getTotalNegativeToOthers()
{
return totalNegativeToOthers.get();
}
-
-
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -90,7 +90,9 @@
private static final int STATE_LOADED = 2;
- private static final int FORMAT_VERSION = 1;
+ public static final int FORMAT_VERSION = 2;
+
+ private static final int COMPATIBLE_VERSIONS[] = new int[] {1};
// Static --------------------------------------------------------
@@ -400,9 +402,10 @@
try
{
- long fileID = readFileHeader(file);
+
+ JournalFileImpl jrnFile = readFileHeader(file);
- orderedFiles.add(new JournalFileImpl(file, fileID));
+ orderedFiles.add(jrnFile);
}
finally
{
@@ -501,6 +504,21 @@
continue;
}
+ short compactCount = 0;
+
+ if (file.getJournalVersion() >= 2)
+ {
+ if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_BYTE))
+ {
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ compactCount = wholeFileBuffer.get();
+ }
+
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -604,7 +622,7 @@
variableSize = 0;
}
- int recordSize = JournalImpl.getRecordSize(recordType);
+ int recordSize = JournalImpl.getRecordSize(recordType, file.getJournalVersion());
// VI - this is completing V, We will validate the size at the end
// of the record,
@@ -678,13 +696,13 @@
{
case ADD_RECORD:
{
- reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD:
{
- reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
@@ -696,19 +714,19 @@
case ADD_RECORD_TX:
{
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount));
break;
}
case UPDATE_RECORD_TX:
{
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount));
break;
}
case DELETE_RECORD_TX:
{
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true, compactCount));
break;
}
@@ -1695,6 +1713,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
* <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
@@ -1710,6 +1729,7 @@
* <tr><td><b>Field Name</b></td><td><b>Size</b></td></tr>
* <tr><td>RecordType</td><td>Byte (1)</td></tr>
* <tr><td>FileID</td><td>Integer (4 bytes)</td></tr>
+ * <tr><td>Compactor Counter</td><td>1 byte</td></tr>
* <tr><td>TransactionID <i>(if record is transactional)</i></td><td>Long (8 bytes)</td></tr>
* <tr><td>ExtraDataLength (Prepares only)</td><td>Integer (4 bytes)</td></tr>
* <tr><td>Number Of Files (N)</td><td>Integer (4 bytes)</td></tr>
@@ -1778,7 +1798,7 @@
loadManager.addRecord(info);
- records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD));
+ records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -1797,7 +1817,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD);
+ posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact count
}
}
@@ -1847,7 +1867,7 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX);
+ tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
@@ -2137,7 +2157,6 @@
if (file.isCanReclaim())
{
// File can be reclaimed or deleted
-
if (JournalImpl.trace)
{
JournalImpl.trace("Reclaiming file " + file);
@@ -2151,67 +2170,6 @@
addFreeFile(file, false);
}
}
-
- int nCleanup = 0;
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- nCleanup++;
- }
- }
-
- if (compactMinFiles > 0)
- {
- if (nCleanup > 0 && needsCompact())
- {
- for (JournalFile file : dataFiles)
- {
- if (file.isNeedCleanup())
- {
- final JournalFile cleanupFile = file;
-
- if (compactorRunning.compareAndSet(false, true))
- {
- // The cleanup should happen rarely.
- // but when it happens it needs to use a different thread,
- // or opening new files or any other executor's usage will be blocked while the cleanUp is being
- // processed.
-
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- cleanUp(cleanupFile);
- }
- catch (Throwable e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- finally
- {
- compactorRunning.set(false);
- if (autoReclaim)
- {
- scheduleReclaim();
- }
- }
- }
- });
- }
- return true;
- }
- else
- {
- // We only cleanup the first files
- // if a middle file needs cleanup it will be done through compacting
- break;
- }
- }
- }
- }
}
finally
{
@@ -2221,132 +2179,9 @@
return false;
}
- // This method is public for tests
- public synchronized void cleanUp(final JournalFile file) throws Exception
- {
- if (state != JournalImpl.STATE_LOADED)
- {
- return;
- }
+
+ int deleteme = 0;
- try
- {
- JournalCleaner cleaner = null;
- ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
- compactingLock.writeLock().lock();
-
- try
- {
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("Cleaning up file " + file);
- }
- JournalImpl.log.debug("Cleaning up file " + file);
-
- if (file.getPosCount() == 0)
- {
- // nothing to be done
- return;
- }
-
- // We don't want this file to be reclaimed during the cleanup
- file.incPosCount();
-
- // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
- // could be reset
- for (JournalFile jrnFile : dataFiles)
- {
- if (jrnFile.resetNegCount(file))
- {
- dependencies.add(jrnFile);
- jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
- }
- }
-
- currentFile.resetNegCount(file);
- currentFile.incPosCount();
- dependencies.add(currentFile);
-
- cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
- }
- finally
- {
- compactingLock.writeLock().unlock();
- }
-
- compactingLock.readLock().lock();
-
- try
- {
- JournalImpl.readJournalFile(fileFactory, file, cleaner);
- }
- catch (Throwable e)
- {
- log.warn("Error reading cleanup on " + file, e);
- throw new Exception("Error reading cleanup on " + file, e);
- }
-
- cleaner.flush();
-
- // pointcut for tests
- // We need to test concurrent updates on the journal, as the compacting is being performed.
- // Usually tests will use this to hold the compacting while other structures are being updated.
- onCompactDone();
-
- for (JournalFile jrnfile : dependencies)
- {
- jrnfile.decPosCount();
- }
- file.decPosCount();
-
- SequentialFile tmpFile = cleaner.currentFile.getFile();
- String tmpFileName = tmpFile.getFileName();
- String cleanedFileName = file.getFile().getFileName();
-
- SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
- cleanedFileName));
-
- SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
-
- returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
-
- tmpFile.renameTo(cleanedFileName);
-
- controlFile.delete();
-
- final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
- if (trace)
- {
- trace("Adding free file back from cleanup" + retJournalfile);
- }
-
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- addFreeFile(retJournalfile, true);
- }
- catch (Throwable e)
- {
- log.warn("Error reinitializing file " + file, e);
- }
-
- }
- });
-
- }
- finally
- {
- compactingLock.readLock().unlock();
- JournalImpl.log.debug("Clean up on file " + file + " done");
- }
-
- }
-
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2361,8 +2196,10 @@
long totalBytes = (long)dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
+
+ boolean needCompact = (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
- return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+ return needCompact;
}
@@ -2842,7 +2679,7 @@
int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- JournalFile jf = new JournalFileImpl(sf, newFileID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID, FORMAT_VERSION);
sf.position(position);
@@ -2888,7 +2725,7 @@
return recordType >= JournalImpl.ADD_RECORD && recordType <= JournalImpl.DELETE_RECORD_TX;
}
- private static int getRecordSize(final byte recordType)
+ private static int getRecordSize(final byte recordType, final int journalVersion)
{
// The record size (without the variable portion)
int recordSize = 0;
@@ -2927,7 +2764,14 @@
throw new IllegalStateException("Record other than expected");
}
- return recordSize;
+ if (journalVersion >= 2)
+ {
+ return recordSize + 1;
+ }
+ else
+ {
+ return recordSize;
+ }
}
/**
@@ -2935,17 +2779,30 @@
* @return
* @throws Exception
*/
- private long readFileHeader(SequentialFile file) throws Exception
+ private JournalFileImpl readFileHeader(SequentialFile file) throws Exception
{
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
file.read(bb);
int journalVersion = bb.getInt();
-
+
if (journalVersion != FORMAT_VERSION)
{
- throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
+ boolean isCompatible = false;
+
+ for (int v : COMPATIBLE_VERSIONS)
+ {
+ if (v == journalVersion)
+ {
+ isCompatible = true;
+ }
+ }
+
+ if (!isCompatible)
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch. You should export the data from the previous version and import it as explained on the user's manual");
+ }
}
int readUserVersion = bb.getInt();
@@ -2960,7 +2817,8 @@
fileFactory.releaseBuffer(bb);
bb = null;
- return fileID;
+
+ return new JournalFileImpl(file, fileID, journalVersion);
}
/**
@@ -3175,7 +3033,7 @@
sequentialFile.position(position);
}
- return new JournalFileImpl(sequentialFile, fileID);
+ return new JournalFileImpl(sequentialFile, fileID, FORMAT_VERSION);
}
/**
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -53,8 +53,6 @@
JournalFile currentFile = files[i];
- currentFile.setNeedCleanup(false);
-
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -101,18 +99,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
- file.setNeedCleanup(true);
-
- if (file.getTotalNegativeToOthers() == 0)
- {
- file.setNeedCleanup(true);
- }
- else
- {
- // This file can't be cleared as the file has negatives to other files as well
- file.setNeedCleanup(false);
- }
-
+
currentFile.setCanReclaim(false);
break;
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -66,6 +66,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -81,6 +83,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1;
}
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -75,6 +75,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -92,6 +94,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1;
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -70,6 +70,8 @@
}
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -105,11 +107,11 @@
{
if (isCommit)
{
- return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+ return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + 1;
}
else
{
- return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+ return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0) + 1;
}
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -44,6 +44,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(id);
@@ -53,6 +55,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD;
+ return JournalImpl.SIZE_DELETE_RECORD + 1;
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -55,6 +55,8 @@
buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
buffer.writeInt(fileID);
+
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
@@ -73,6 +75,6 @@
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0) + 1;
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalInternalRecord.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -27,6 +27,8 @@
{
protected int fileID;
+
+ protected byte compactCount;
public int getFileID()
{
@@ -50,6 +52,23 @@
{
return 0;
}
+
+ public short getCompactCount()
+ {
+ return compactCount;
+ }
+
+ public void setCompactCount(final short compactCount)
+ {
+ if (compactCount > Byte.MAX_VALUE)
+ {
+ this.compactCount = Byte.MAX_VALUE;
+ }
+ else
+ {
+ this.compactCount = (byte)compactCount;
+ }
+ }
public abstract int getEncodeSize();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -39,14 +39,15 @@
{
buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
buffer.writeInt(fileID);
+ buffer.writeByte(compactCount);
buffer.writeLong(txID);
- buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+ buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD + 1);
}
@Override
public int getEncodeSize()
{
- return JournalImpl.SIZE_ROLLBACK_RECORD;
+ return JournalImpl.SIZE_ROLLBACK_RECORD + 1;
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -39,7 +39,6 @@
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TimeAndCounterIDGenerator;
-import org.hornetq.utils.ReusableLatch;
/**
*
@@ -66,7 +65,7 @@
for (int i = 0; i < 5; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
- dataFiles.add(new JournalFileImpl(file, 0));
+ dataFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
@@ -74,7 +73,7 @@
for (int i = 0; i < 3; i++)
{
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
- newFiles.add(new JournalFileImpl(file, 0));
+ newFiles.add(new JournalFileImpl(file, 0, JournalImpl.FORMAT_VERSION));
}
ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
@@ -806,33 +805,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
+ createJournal();
+
startJournal();
load();
@@ -844,26 +818,8 @@
long addedRecord = idGen.generateID();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(consumerTX, firstID);
addTx(appendTX, addedRecord);
@@ -876,10 +832,8 @@
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
long newRecord = idGen.generateID();
@@ -959,52 +913,11 @@
setup(2, 60 * 1024, false);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.cleanUp(journal.getDataFiles()[0]);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
for (int i = 0; i < 100; i++)
{
add(i);
@@ -1017,20 +930,16 @@
delete(i);
}
- tCompact.start();
-
- reusableLatchDone.await();
-
+ startCompact();
+
// Delete part of the live records while cleanup still working
for (int i = 1; i < 5; i++)
{
delete(i);
}
- reusableLatchWait.countDown();
-
- tCompact.join();
-
+ finishCompact();
+
// Delete part of the live records after cleanup is done
for (int i = 5; i < 10; i++)
{
@@ -1054,53 +963,12 @@
setup(2, 60 * 1024, false);
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ createJournal();
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
- journal.setAutoReclaim(false);
-
startJournal();
load();
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
long appendTX = idGen.generateID();
long appendOne = idGen.generateID();
long appendTwo = idGen.generateID();
@@ -1109,9 +977,8 @@
addTx(appendTX, appendOne);
- tCompact.start();
- reusableLatchDone.await();
-
+ startCompact();
+
addTx(appendTX, appendTwo);
commit(appendTX);
@@ -1122,8 +989,7 @@
commit(updateTX);
// delete(appendTwo);
- reusableLatchWait.countDown();
- tCompact.join();
+ finishCompact();
journal.compact();
@@ -1239,13 +1105,13 @@
long id = idGenerator.generateID();
listToDelete.add(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
add(id);
journal.forceMoveNextFile();
update(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD + 1);
journal.forceMoveNextFile();
}
@@ -1295,7 +1161,55 @@
}
}
+
+ public void testCompactFirstFileWithPendingCommits() throws Exception
+ {
+ setup(2, 60 * 1024, true);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ long tx = idGenerator.generateID();
+ for (int i = 0; i < 10; i++)
+ {
+ addTx(tx, idGenerator.generateID());
+ }
+
+ journal.forceMoveNextFile();
+ commit(tx);
+
+
+ ArrayList<Long> listToDelete = new ArrayList<Long>();
+ for (int i = 0; i < 10; i++)
+ {
+ long id = idGenerator.generateID();
+ listToDelete.add(id);
+ add(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ for (Long id : listToDelete)
+ {
+ delete(id);
+ }
+
+ journal.forceMoveNextFile();
+
+ // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
+ journal.compact();
+
+ journal.checkReclaimStatus();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
public void testLiveSizeTransactional() throws Exception
{
setup(2, 60 * 1024, true);
Added: trunk/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/OldFormatTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A OldFormatTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OldFormatTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // This will generate records using the Version 1 format, and reading at the current version
+ public void testFormatOne() throws Exception
+ {
+ setup(2, 100 * 1024, true);
+
+ SequentialFile file = fileFactory.createSequentialFile("hq-1.hq", 1);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(100 * 1024);
+
+ initHeader(buffer, 1);
+
+ byte[] record = new byte[1];
+
+ for (long i = 0 ; i < 10; i++)
+ {
+ add(buffer, 1, i, record);
+
+ update(buffer, 1, i, record);
+ }
+
+ file.open(1, false);
+
+ buffer.rewind();
+
+ file.writeDirect(buffer, true);
+
+ file.close();
+
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ startCompact();
+ finishCompact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ private void add(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.ADD_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, false, (short)0));
+ }
+
+ private void update(ByteBuffer buffer, int fileID, long id, byte[] record)
+ {
+ int pos = buffer.position();
+
+ buffer.put(JournalImpl.UPDATE_RECORD);
+
+ buffer.putInt(fileID);
+
+ buffer.putLong(id);
+
+ buffer.putInt(record.length);
+
+ buffer.put((byte)0);
+
+ buffer.put(record);
+
+ buffer.putInt(buffer.position() - pos + DataConstants.SIZE_INT);
+
+ records.add(new RecordInfo(id, (byte)0, record, true, (short)0));
+
+ }
+
+ /**
+ * @param buffer
+ */
+ private void initHeader(ByteBuffer buffer, int fileID)
+ {
+ buffer.putInt(1);
+
+ buffer.putInt(0);
+
+ buffer.putLong(fileID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalCleanupCompactSoakTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -39,7 +39,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.HOURS.toMillis(2);
+ return TimeUnit.HOURS.toMillis(8);
}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -25,6 +26,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -56,6 +59,8 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
private static final int MAX_WRITES = 20000;
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -82,6 +87,12 @@
Executor testExecutor;
+ protected long getTotalTimeMilliseconds()
+ {
+ return TimeUnit.MINUTES.toMillis(2);
+ }
+
+
@Override
public void setUp() throws Exception
{
@@ -114,7 +125,7 @@
journal = new JournalImpl(50 * 1024,
20,
- 15,
+ 50,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
"hornetq-data",
@@ -181,11 +192,6 @@
threadPool.shutdown();
}
- protected long getTotalTimeMilliseconds()
- {
- return TimeUnit.MINUTES.toMillis(10);
- }
-
public void testAppend() throws Exception
{
@@ -225,6 +231,12 @@
", liveRecords = " +
(numberOfRecords.get() - numberOfDeletes.get()));
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ rwLock.writeLock().lock();
+ System.out.println("Restarting server");
+ journal.stop();
+ journal.start();
+ reloadJournal();
+ rwLock.writeLock().unlock();
}
running = false;
@@ -255,12 +267,36 @@
latchExecutorDone.await();
- assertEquals(0, errors.get());
-
journal.stop();
journal.start();
+ reloadJournal();
+
+ Collection<Long> records = journal.getRecords().keySet();
+
+ System.out.println("Deleting everything!");
+ for (Long delInfo : records)
+ {
+ journal.appendDeleteRecord(delInfo, false);
+ }
+
+ journal.forceMoveNextFile();
+
+ Thread.sleep(5000);
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ journal.stop();
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void reloadJournal() throws Exception
+ {
+ assertEquals(0, errors.get());
+
ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
@@ -285,8 +321,6 @@
}
assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
-
- journal.stop();
}
private byte[] generateRecord()
@@ -313,9 +347,10 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
+
try
{
-
while (running)
{
final int txSize = RandomUtil.randomMax(100);
@@ -358,6 +393,14 @@
}
}
});
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
+
}
}
catch (Exception e)
@@ -366,6 +409,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -384,6 +431,9 @@
@Override
public void run()
{
+
+ rwLock.readLock().lock();
+
try
{
int txSize = RandomUtil.randomMax(100);
@@ -391,17 +441,30 @@
long ids[] = new long[txSize];
long txID = JournalCleanupCompactStressTest.idGen.generateID();
-
+
while (running)
{
- long id = queue.poll(60, TimeUnit.MINUTES);
- ids[txCount] = id;
- journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- if (++txCount == txSize)
+ Long id = queue.poll(10, TimeUnit.SECONDS);
+ if (id != null)
{
- journal.appendCommitRecord(txID, true, ctx);
- ctx.executeOnCompletion(new DeleteTask(ids));
+ ids[txCount++] = id;
+ journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
+ }
+ if (txCount == txSize || id == null)
+ {
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true, ctx);
+ ctx.executeOnCompletion(new DeleteTask(ids));
+ }
+
+ rwLock.readLock().unlock();
+
+ Thread.yield();
+
+ rwLock.readLock().lock();
+
txCount = 0;
txSize = RandomUtil.randomMax(100);
txID = JournalCleanupCompactStressTest.idGen.generateID();
@@ -420,6 +483,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
@@ -434,14 +501,18 @@
public void done()
{
+ rwLock.readLock().lock();
numberOfUpdates.addAndGet(ids.length);
try
{
for (long id : ids)
{
- journal.appendDeleteRecord(id, false);
- maxRecords.release();
- numberOfDeletes.incrementAndGet();
+ if (id != 0)
+ {
+ journal.appendDeleteRecord(id, false);
+ maxRecords.release();
+ numberOfDeletes.incrementAndGet();
+ }
}
}
catch (Exception e)
@@ -451,6 +522,10 @@
running = false;
errors.incrementAndGet();
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
public void onError(final int errorCode, final String errorMessage)
@@ -473,6 +548,7 @@
@Override
public void run()
{
+ rwLock.readLock().lock();
try
{
while (running)
@@ -481,18 +557,22 @@
// Append
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("append slow");
+ System.out.println("append slow");
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
maxRecords.acquire();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
numberOfRecords.incrementAndGet();
+ rwLock.readLock().unlock();
+
Thread.sleep(TimeUnit.SECONDS.toMillis(50));
+
+ rwLock.readLock().lock();
}
// Delete
for (int i = 0; running & i < ids.length; i++)
{
- // System.out.println("Deleting");
+ System.out.println("Deleting");
maxRecords.release();
journal.appendDeleteRecord(ids[i], false);
numberOfDeletes.incrementAndGet();
@@ -504,6 +584,10 @@
e.printStackTrace();
System.exit(-1);
}
+ finally
+ {
+ rwLock.readLock().unlock();
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -542,7 +542,7 @@
journalImpl.appendAddRecordTransactional(1l,
2l,
(byte)3,
- new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+ new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX - 1, (byte)4));
journalImpl.appendCommitRecord(1l, false);
@@ -587,11 +587,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
@@ -652,11 +652,11 @@
// jumping RecordType, FileId, TransactionID, RecordID, VariableSize,
// RecordType, RecordBody (that we know it is 1 )
- buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1 + 1);
int posCheckSize = buffer.position();
- Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+ Assert.assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 2, buffer.getInt());
buffer.position(posCheckSize);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -333,7 +333,7 @@
journal.appendAddRecord(element, (byte)0, record, sync);
- records.add(new RecordInfo(element, (byte)0, record, false));
+ records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
journal.debugWait();
@@ -349,7 +349,7 @@
journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
- records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
@@ -377,13 +377,13 @@
{
// SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length +
// SIZE_BYTE
- byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] record = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendAddRecordTransactional(txID, element, (byte)0, record);
- tx.records.add(new RecordInfo(element, (byte)0, record, false));
+ tx.records.add(new RecordInfo(element, (byte)0, record, false, (short)0));
}
@@ -396,13 +396,13 @@
for (long element : arguments)
{
- byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
+ byte[] updateRecord = generateRecord(recordLength - (JournalImpl.SIZE_ADD_RECORD_TX + 1));
beforeJournalOperation();
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
- tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
+ tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true, (short)0));
}
journal.debugWait();
}
@@ -417,7 +417,7 @@
journal.appendDeleteRecordTransactional(txID, element);
- tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
+ tx.deletes.add(new RecordInfo(element, (byte)0, null, true, (short)0));
}
journal.debugWait();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -492,7 +492,8 @@
private int calculateRecordsPerFile(final int fileSize, final int alignment, int recordSize)
{
recordSize = calculateRecordSize(recordSize, alignment);
- return fileSize / recordSize;
+ int headerSize = calculateRecordSize(JournalImpl.SIZE_HEADER, alignment);
+ return (fileSize - headerSize) / recordSize;
}
/**
@@ -666,7 +667,9 @@
int addRecordsPerFile = calculateRecordsPerFile(10 * 1024,
journal.getAlignment(),
- JournalImpl.SIZE_ADD_RECORD + recordLength);
+ JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
+
+ System.out.println(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength);
// Fills exactly 10 files
int initialNumberOfAddRecords = addRecordsPerFile * 10;
@@ -693,29 +696,11 @@
// Now delete half of them
- int deleteRecordsPerFile = calculateRecordsPerFile(10 * 1024,
- journal.getAlignment(),
- JournalImpl.SIZE_DELETE_RECORD);
-
for (int i = 0; i < initialNumberOfAddRecords / 2; i++)
{
delete(i);
}
- int numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD);
-
- if (initialNumberOfAddRecords / 2 % deleteRecordsPerFile == 0)
- {
- // The file is already full, next add would fix it
- numberOfFiles--;
- }
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2, journal.getIDMapSize());
@@ -726,16 +711,6 @@
add(initialNumberOfAddRecords + i);
}
- numberOfFiles = calculateNumberOfFiles(10 * 1024,
- journal.getAlignment(),
- initialNumberOfAddRecords,
- JournalImpl.SIZE_ADD_RECORD + recordLength,
- initialNumberOfAddRecords / 2,
- JournalImpl.SIZE_DELETE_RECORD,
- 10,
- JournalImpl.SIZE_ADD_RECORD + recordLength);
-
- Assert.assertEquals(numberOfFiles, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(initialNumberOfAddRecords / 2 + 10, journal.getIDMapSize());
@@ -786,7 +761,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
startJournal();
@@ -825,7 +800,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + 1 + recordLength,
getAlignment()), true);
createJournal();
@@ -1130,7 +1105,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 2
journal.debugWait();
@@ -1161,7 +1136,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 4
List<String> files5 = fileFactory.listFiles(fileExtension);
@@ -1491,7 +1466,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2);
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2);
// Move on to another file
@@ -1544,7 +1519,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1565,7 +1540,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_COMMIT_RECORD + 1) + 2, files3.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1586,9 +1561,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1596,9 +1571,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_COMMIT_RECORD,
+ JournalImpl.SIZE_COMMIT_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1608,9 +1583,6 @@
List<String> files5 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files5.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1619,9 +1591,6 @@
List<String> files6 = fileFactory.listFiles(fileExtension);
- Assert.assertEquals(4, files6.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1633,11 +1602,6 @@
startJournal();
loadAndCheck();
- List<String> files7 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(4, files7.size());
-
- Assert.assertEquals(2, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(2, journal.getIDMapSize());
@@ -1665,7 +1629,7 @@
// Make sure we move on to the next file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 1
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 2); // in file 1
List<String> files2 = fileFactory.listFiles(fileExtension);
@@ -1678,14 +1642,12 @@
rollback(1); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
Assert.assertEquals(calculateNumberOfFiles(fileSize,
journal.getAlignment(),
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD) + 2, files3.size());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1693,7 +1655,7 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1706,9 +1668,9 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
+ JournalImpl.SIZE_DELETE_RECORD + 1) + 2, files4.size());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(calculateNumberOfFiles(fileSize,
@@ -1716,43 +1678,19 @@
2,
recordLength,
1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
+ JournalImpl.SIZE_ROLLBACK_RECORD + 1,
1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
+ JournalImpl.SIZE_DELETE_RECORD + 1), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD - 1, 3); // in file 2
// (current
// file)
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_ROLLBACK_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1859,76 +1797,23 @@
EncodingSupport xid = new SimpleEncoding(10, (byte)0);
prepare(1, xid); // in file 1
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(3, files3.size());
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
Assert.assertEquals(1, journal.getOpenedFilesCount());
delete(2); // in file 1
- List<String> files4 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD) + 2, files4.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD), journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Move on to another file
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 2
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 3); // in file 2
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength) + 2, files5.size());
-
Assert.assertEquals(1, journal.getOpenedFilesCount());
- Assert.assertEquals(calculateNumberOfFiles(fileSize,
- journal.getAlignment(),
- 2,
- recordLength,
- 1,
- JournalImpl.SIZE_PREPARE_RECORD,
- 1,
- JournalImpl.SIZE_DELETE_RECORD,
- 1,
- recordLength), journal.getDataFilesCount());
-
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
@@ -1943,7 +1828,7 @@
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(1, journal.getIDMapSize());
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 4); // in file 3
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD -1, 4); // in file 3
List<String> files7 = fileFactory.listFiles(fileExtension);
@@ -2415,7 +2300,7 @@
journal.appendAddRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, false));
+ records.add(new RecordInfo(i, (byte)0, record, false, (short)0));
}
for (int i = 0; i < 100; i++)
@@ -2424,7 +2309,7 @@
journal.appendUpdateRecord(i, (byte)0, record, false);
- records.add(new RecordInfo(i, (byte)0, record, true));
+ records.add(new RecordInfo(i, (byte)0, record, true, (short)0));
}
for (int i = 0; i < 100; i++)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 12:16:12 UTC (rev 9587)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-08-24 12:23:22 UTC (rev 9588)
@@ -22,6 +22,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.Reclaimer;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -357,23 +358,6 @@
assertCantDelete(2);
}
- public void testCleanup() throws Exception
- {
- setup(3);
- setupPosNeg(0, 11, 0, 0, 0);
- setupPosNeg(1, 1, 10, 0, 0);
- setupPosNeg(2, 1, 0, 1, 0);
-
- reclaimer.scan(files);
-
- debugFiles();
-
- assertCantDelete(0);
- Assert.assertTrue(files[0].isNeedCleanup());
- assertCantDelete(1);
- assertCantDelete(2);
- }
-
public void testThreeFiles10() throws Exception
{
setup(3);
@@ -741,9 +725,7 @@
"]=" +
files[i].getPosCount() +
", canDelete = " +
- files[i].isCanReclaim() +
- ", cleanup = " +
- files[i].isNeedCleanup());
+ files[i].isCanReclaim());
for (int j = 0; j <= i; j++)
{
System.out.println("..." + files[i].getNegCount(files[j]));
@@ -1002,5 +984,31 @@
{
return totalDep;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getJournalVersion()
+ */
+ public int getJournalVersion()
+ {
+ return JournalImpl.FORMAT_VERSION;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getTotNeg()
+ */
+ public int getTotNeg()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#setTotNeg(int)
+ */
+ public void setTotNeg(int totNeg)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
14 years, 5 months
JBoss hornetq SVN: r9587 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration: persistence and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-24 08:16:12 -0400 (Tue, 24 Aug 2010)
New Revision: 9587
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/server/LVQTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
Log:
fix use of ServerLocator in tests
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -147,15 +147,16 @@
try
{
ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory sf = locator.createSessionFactory();
// Making it synchronous, just because we want to stop sending messages as soon as the page-store becomes in
// page mode
// and we could only guarantee that by setting it to synchronous
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
+
ClientSession session = sf.createSession(null, null, false, !transacted, !transacted, false, 0);
session.createQueue(PageCrashTest.ADDRESS, PageCrashTest.ADDRESS, null, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/persistence/DeleteQueueRestartTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -66,12 +66,13 @@
server.start();
ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setMinLargeMessageSize(1024 * 1024);
+
ClientSessionFactory factory = locator.createSessionFactory();
- factory.getServerLocator().setBlockOnDurableSend(true);
- factory.getServerLocator().setBlockOnNonDurableSend(true);
- factory.getServerLocator().setMinLargeMessageSize(1024 * 1024);
-
final ClientSession session = factory.createSession(false, true, true);
session.createQueue(DeleteQueueRestartTest.ADDRESS, DeleteQueueRestartTest.ADDRESS, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -63,9 +63,9 @@
String address = RandomUtil.randomString();
String queue = RandomUtil.randomString();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(getConnectorTransportConfiguration(true));
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
ClientSessionFactory csf = locator.createSessionFactory();
- csf.getServerLocator().setBlockOnNonDurableSend(false);
- csf.getServerLocator().setBlockOnDurableSend(false);
ClientSession session = null;
if (transactional)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/security/SecurityTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/security/SecurityTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -501,8 +501,8 @@
roles.add(role);
securityRepository.addMatch(SecurityTest.addressA, roles);
securityManager.addRole("auser", "arole");
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory cf = locator.createSessionFactory();
- cf.getServerLocator().setBlockOnNonDurableSend(true);
ClientSession session = cf.createSession("auser", "pass", false, true, true, false, -1);
session.createQueue(SecurityTest.addressA, SecurityTest.queueA, true);
ClientProducer cp = session.createProducer(SecurityTest.addressA);
@@ -535,8 +535,8 @@
roles.add(role);
securityRepository.addMatch(SecurityTest.addressA, roles);
securityManager.addRole("auser", "arole");
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory cf = locator.createSessionFactory();
- cf.getServerLocator().setBlockOnNonDurableSend(true);
ClientSession session = cf.createSession("auser", "pass", false, true, true, false, -1);
session.createQueue(SecurityTest.addressA, SecurityTest.queueA, true);
ClientProducer cp = session.createProducer(SecurityTest.addressA);
@@ -940,8 +940,8 @@
roles.add(role);
securityRepository.addMatch(configuration.getManagementAddress().toString(), roles);
securityManager.addRole("auser", "arole");
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory cf = locator.createSessionFactory();
- cf.getServerLocator().setBlockOnNonDurableSend(true);
ClientSession session = cf.createSession("auser", "pass", false, true, true, false, -1);
ClientProducer cp = session.createProducer(configuration.getManagementAddress());
cp.send(session.createMessage(false));
@@ -1169,9 +1169,9 @@
ClientSession andrewConnection = null;
ClientSession frankConnection = null;
ClientSession samConnection = null;
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
ClientSessionFactory factory = locator.createSessionFactory();
- factory.getServerLocator().setBlockOnNonDurableSend(true);
- factory.getServerLocator().setBlockOnDurableSend(true);
ClientSession adminSession = factory.createSession("all", "all", false, true, true, false, -1);
String genericQueueName = "genericQueue";
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -582,9 +582,9 @@
server.getAddressSettingsRepository().addMatch(address.toString(), qs);
// then we create a client as normalServer
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnAcknowledge(true);
+ locator.setAckBatchSize(0);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
- sessionFactory.getServerLocator().setBlockOnAcknowledge(true);
- sessionFactory.getServerLocator().setAckBatchSize(0);
clientSession = sessionFactory.createSession(false, true, true);
clientSessionTxReceives = sessionFactory.createSession(false, true, false);
clientSessionTxSends = sessionFactory.createSession(false, false, true);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-08-24 11:01:12 UTC (rev 9586)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/ssl/CoreClientOverSSLTest.java 2010-08-24 12:16:12 UTC (rev 9587)
@@ -114,8 +114,8 @@
tc.getParams().put(TransportConstants.SSL_ENABLED_PROP_NAME, false);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(tc);
+ locator.setCallTimeout(2000);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setCallTimeout(2000);
try
{
sf.createSession(false, true, true);
14 years, 5 months
JBoss hornetq SVN: r9586 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-24 07:01:12 -0400 (Tue, 24 Aug 2010)
New Revision: 9586
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
fix management tests
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-08-24 09:23:32 UTC (rev 9585)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2010-08-24 11:01:12 UTC (rev 9586)
@@ -274,9 +274,9 @@
server.start();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnNonDurableSend(true);
session = sf.createSession(false, true, false);
session.start();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-08-24 09:23:32 UTC (rev 9585)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/AddressControlUsingCoreTest.java 2010-08-24 11:01:12 UTC (rev 9586)
@@ -162,9 +162,9 @@
server.start();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnNonDurableSend(true);
session = sf.createSession(false, true, false);
session.start();
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlTest.java 2010-08-24 09:23:32 UTC (rev 9585)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/BroadcastGroupControlTest.java 2010-08-24 11:01:12 UTC (rev 9586)
@@ -99,18 +99,15 @@
Object[] connectorPairs = broadcastGroupControl.getConnectorPairs();
Assert.assertEquals(1, connectorPairs.length);
- Object[] connectorPairData = (Object[])connectorPairs[0];
- Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), connectorPairData[0]);
- Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(1), connectorPairData[1]);
- fail("fix^^");
+ System.out.println(connectorPairs);
+ String connectorPairData = (String)connectorPairs[0];
+ Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), connectorPairData);
String jsonString = broadcastGroupControl.getConnectorPairsAsJSON();
Assert.assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
Assert.assertEquals(1, array.length());
- JSONObject data = array.getJSONObject(0);
- Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), data.optString("a"));
- Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(1), data.optString("b", null));
-
+ Assert.assertEquals(broadcastGroupConfig.getConnectorInfos().get(0), array.getString(0));
+
Assert.assertTrue(broadcastGroupControl.isStarted());
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2010-08-24 09:23:32 UTC (rev 9585)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2010-08-24 11:01:12 UTC (rev 9586)
@@ -32,6 +32,7 @@
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.DiscoveryGroupConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -42,7 +43,6 @@
import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
/**
* A BridgeControlTest
@@ -90,18 +90,17 @@
clusterConnectionControl.isForwardWhenNoConsumers());
Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
- Object[] connectorPairs = clusterConnectionControl.getStaticConnectors();
- Assert.assertEquals(1, connectorPairs.length);
- Object[] connectorPairData = (Object[])connectorPairs[0];
- Assert.assertEquals(clusterConnectionConfig1.getStaticConnectors().get(0), connectorPairData[0]);
+ Object[] connectors = clusterConnectionControl.getStaticConnectors();
+ Assert.assertEquals(1, connectors.length);
+ String connector = (String)connectors[0];
+ Assert.assertEquals(clusterConnectionConfig1.getStaticConnectors().get(0), connector);
String jsonString = clusterConnectionControl.getStaticConnectorsAsJSON();
Assert.assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
Assert.assertEquals(1, array.length());
- JSONObject data = array.getJSONObject(0);
- Assert.assertEquals(clusterConnectionConfig1.getStaticConnectors().get(0), data.optString("a"));
-
+ Assert.assertEquals(clusterConnectionConfig1.getStaticConnectors().get(0), array.getString(0));
+
Assert.assertNull(clusterConnectionControl.getDiscoveryGroupName());
Assert.assertTrue(clusterConnectionControl.isStarted());
@@ -203,7 +202,7 @@
false);
List<String> connectors = new ArrayList<String>();
connectors.add(connectorConfig.getName());
-
+
clusterConnectionConfig1 = new ClusterConnectionConfiguration(RandomUtil.randomString(),
queueConfig.getAddress(),
connectorConfig.getName(),
@@ -214,6 +213,9 @@
RandomUtil.randomPositiveInt(),
connectors);
+ String discoveryGroupName = RandomUtil.randomString();
+ DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration(discoveryGroupName, null, "230.1.2.3", 6745, 500);
+
clusterConnectionConfig2 = new ClusterConnectionConfiguration(RandomUtil.randomString(),
queueConfig.getAddress(),
connectorConfig.getName(),
@@ -222,8 +224,8 @@
RandomUtil.randomBoolean(),
RandomUtil.randomPositiveInt(),
RandomUtil.randomPositiveInt(),
- RandomUtil.randomString());
-
+ discoveryGroupName);
+
Configuration conf_1 = new ConfigurationImpl();
conf_1.setSecurityEnabled(false);
conf_1.setJMXManagementEnabled(true);
@@ -239,6 +241,7 @@
conf_0.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
conf_0.getClusterConfigurations().add(clusterConnectionConfig1);
conf_0.getClusterConfigurations().add(clusterConnectionConfig2);
+ conf_0.getDiscoveryGroupConfigurations().put(discoveryGroupName, discoveryGroupConfig);
mbeanServer_1 = MBeanServerFactory.createMBeanServer();
server_1 = HornetQServers.newHornetQServer(conf_1, mbeanServer_1, false);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-08-24 09:23:32 UTC (rev 9585)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2010-08-24 11:01:12 UTC (rev 9586)
@@ -1282,9 +1282,9 @@
server.start();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnNonDurableSend(true);
session = sf.createSession(false, true, false);
session.start();
}
14 years, 5 months
JBoss hornetq SVN: r9585 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-24 05:23:32 -0400 (Tue, 24 Aug 2010)
New Revision: 9585
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
Log:
fix JMS ConnectionFactory parsing
* when the CF uses discovery, set accordingly its discovery refresh timeout
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-24 08:37:37 UTC (rev 9584)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerConfigParserImpl.java 2010-08-24 09:23:32 UTC (rev 9585)
@@ -417,6 +417,7 @@
strbindings);
cfConfig.setLocalBindAddress(discoveryGroupConfiguration.getLocalBindAddress());
cfConfig.setInitialWaitTimeout(discoveryInitialWaitTimeout);
+ cfConfig.setDiscoveryRefreshTimeout(discoveryGroupConfiguration.getRefreshTimeout());
}
else
{
14 years, 5 months
JBoss hornetq SVN: r9584 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/http.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-24 04:37:37 -0400 (Tue, 24 Aug 2010)
New Revision: 9584
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java
Log:
fix CoreClientOverHttpTest
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java 2010-08-24 08:22:54 UTC (rev 9583)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/http/CoreClientOverHttpTest.java 2010-08-24 08:37:37 UTC (rev 9584)
@@ -21,6 +21,8 @@
import org.hornetq.api.core.client.*;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
@@ -42,13 +44,13 @@
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
- conf.getAcceptorConfigurations().add(new TransportConfiguration(UnitTestCase.NETTY_ACCEPTOR_FACTORY, params));
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params));
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -98,15 +100,15 @@
HashMap<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HTTP_ENABLED_PROP_NAME, true);
- conf.getAcceptorConfigurations().add(new TransportConfiguration(UnitTestCase.NETTY_ACCEPTOR_FACTORY, params));
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
HornetQServer server = HornetQServers.newHornetQServer(conf, false);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params));
+ ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
+ locator.setConnectionTTL(500);
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setConnectionTTL(500);
ClientSession session = sf.createSession(false, true, true);
14 years, 5 months
JBoss hornetq SVN: r9583 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/divert.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-24 04:22:54 -0400 (Tue, 24 Aug 2010)
New Revision: 9583
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
Log:
fix PersistentDivertTest
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2010-08-23 15:57:26 UTC (rev 9582)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2010-08-24 08:22:54 UTC (rev 9583)
@@ -106,12 +106,12 @@
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setBlockOnAcknowledge(true);
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
-
ClientSession session = sf.createSession(true, true, 0);
final SimpleString queueName1 = new SimpleString("queue1");
@@ -310,12 +310,12 @@
messagingService.start();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator.setBlockOnAcknowledge(true);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
ClientSessionFactory sf = locator.createSessionFactory();
- sf.getServerLocator().setBlockOnAcknowledge(true);
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
-
ClientSession session = sf.createSession(true, true, 0);
final SimpleString queueName1 = new SimpleString("queue1");
@@ -363,10 +363,9 @@
messagingService.start();
ServerLocator locator2 = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
- sf = locator2.createSessionFactory();
+ locator2.setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
-
+ sf = locator2.createSessionFactory();
session = sf.createSession(false, true, true);
session.start();
@@ -460,11 +459,10 @@
messagingService.start();
ServerLocator locator3 = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator3.setBlockOnDurableSend(true);
sf = locator3.createSessionFactory();
- sf.getServerLocator().setBlockOnDurableSend(true);
-
session = sf.createSession(false, true, true);
consumer1 = session.createConsumer(queueName1);
14 years, 5 months
JBoss hornetq SVN: r9582 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-23 11:57:26 -0400 (Mon, 23 Aug 2010)
New Revision: 9582
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
Log:
fix HornetQConnectionFactoryTest
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-08-23 14:54:38 UTC (rev 9581)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2010-08-23 15:57:26 UTC (rev 9582)
@@ -23,19 +23,16 @@
import junit.framework.Assert;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -57,8 +54,6 @@
private HornetQServer liveService;
- private HornetQServer backupService;
-
private TransportConfiguration liveTC;
public void testDefaultConstructor() throws Exception
@@ -67,7 +62,7 @@
assertFactoryParams(cf,
null,
null,
- 0,
+ -1,
HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
null,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -126,7 +121,7 @@
assertFactoryParams(cf,
new TransportConfiguration[]{liveTC},
null,
- 0,
+ -1,
HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
null,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -211,7 +206,7 @@
assertFactoryParams(cf,
new TransportConfiguration[]{liveTC},
null,
- 0,
+ -1,
HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
null,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -248,56 +243,13 @@
}
- public void testStaticConnectorLiveAndBackupConstructor() throws Exception
- {
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(liveTC);
- assertFactoryParams(cf,
- new TransportConfiguration[]{liveTC},
- null,
- 0,
- HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
- null,
- HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- HornetQClient.DEFAULT_CONNECTION_TTL,
- HornetQClient.DEFAULT_CALL_TIMEOUT,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
- HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
- HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
- HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND,
- HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND,
- HornetQClient.DEFAULT_AUTO_GROUP,
- HornetQClient.DEFAULT_PRE_ACKNOWLEDGE,
- HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_ACK_BATCH_SIZE,
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- HornetQClient.DEFAULT_USE_GLOBAL_POOLS,
- HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE,
- HornetQClient.DEFAULT_RETRY_INTERVAL,
- HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
- HornetQClient.DEFAULT_RECONNECT_ATTEMPTS,
- HornetQClient.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
- Connection conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- testSettersThrowException(cf);
-
- conn.close();
-
- }
-
public void testStaticConnectorLiveConstructor() throws Exception
{
HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(liveTC);
assertFactoryParams(cf,
new TransportConfiguration[]{liveTC},
null,
- 0,
+ -1,
HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT,
null,
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
@@ -337,8 +289,6 @@
{
HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(liveTC);
- String discoveryAddress = RandomUtil.randomString();
- int discoveryPort = RandomUtil.randomPositiveInt();
long discoveryRefreshTimeout = RandomUtil.randomPositiveLong();
long clientFailureCheckPeriod = RandomUtil.randomPositiveLong();
long connectionTTL = RandomUtil.randomPositiveLong();
@@ -387,8 +337,8 @@
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
- Assert.assertEquals(discoveryAddress, cf.getDiscoveryAddress());
- Assert.assertEquals(discoveryPort, cf.getDiscoveryPort());
+ Assert.assertEquals(null, cf.getDiscoveryAddress());
+ Assert.assertEquals(-1, cf.getDiscoveryPort());
Assert.assertEquals(discoveryRefreshTimeout, cf.getDiscoveryRefreshTimeout());
Assert.assertEquals(clientFailureCheckPeriod, cf.getClientFailureCheckPeriod());
Assert.assertEquals(connectionTTL, cf.getConnectionTTL());
@@ -795,49 +745,26 @@
{
super.setUp();
- startLiveAndBackup();
+ startServer();
}
@Override
protected void tearDown() throws Exception
{
- stopLiveAndBackup();
+ if (liveService.isStarted())
+ {
+ liveService.stop();
+ }
liveService = null;
- backupService = null;
-
liveTC = null;
super.tearDown();
}
- private void stopLiveAndBackup() throws Exception
+ private void startServer() throws Exception
{
- if (liveService.isStarted())
- {
- liveService.stop();
- }
- if (backupService.isStarted())
- {
- backupService.stop();
- }
- }
-
- private void startLiveAndBackup() throws Exception
- {
- Map<String, Object> backupParams = new HashMap<String, Object>();
- Configuration backupConf = new ConfigurationImpl();
- backupConf.setSecurityEnabled(false);
- backupConf.setClustered(true);
- backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- backupConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory", backupParams));
- backupConf.setBackup(true);
- backupConf.setSharedStore(true);
- backupService = HornetQServers.newHornetQServer(backupConf, false);
- backupService.start();
-
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
liveTC = new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory");
14 years, 5 months
JBoss hornetq SVN: r9581 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-23 10:54:38 -0400 (Mon, 23 Aug 2010)
New Revision: 9581
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
fix TemporaryQueueTest
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-23 14:52:52 UTC (rev 9580)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-23 14:54:38 UTC (rev 9581)
@@ -60,6 +60,8 @@
private ClientSessionFactory sf;
+ private ServerLocator locator;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -161,9 +163,12 @@
}
});
session.close();
+ sf.close();
// wait for the closing listeners to be fired
Assert.assertTrue("connection close listeners not fired", latch.await(2 * TemporaryQueueTest.CONNECTION_TTL,
TimeUnit.MILLISECONDS));
+
+ sf = locator.createSessionFactory();
session = sf.createSession(false, true, true);
session.start();
@@ -210,7 +215,8 @@
public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
{
session.close();
-
+ sf.close();
+
final SimpleString queue = RandomUtil.randomSimpleString();
SimpleString address = RandomUtil.randomSimpleString();
@@ -296,7 +302,7 @@
server = createServer(false, configuration);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
locator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
sf = locator.createSessionFactory();
session = sf.createSession(false, true, true);
14 years, 5 months