[jboss-cvs] JBoss Messaging SVN: r5881 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 17 10:59:21 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-02-17 10:59:21 -0500 (Tue, 17 Feb 2009)
New Revision: 5881
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1427 - Reverting changes and moving it to a branch
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -83,7 +83,7 @@
// Load
- void load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+ long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
int getAlignment() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -22,9 +22,6 @@
package org.jboss.messaging.core.journal;
-import org.jboss.messaging.core.journal.impl.JournalFile;
-
-
/**
*
* A TestableJournal
@@ -35,17 +32,13 @@
*/
public interface TestableJournal extends Journal
{
- int checkAndReclaimFiles() throws Exception;
+ void checkAndReclaimFiles() throws Exception;
int getDataFilesCount();
int getFreeFilesCount();
int getOpenedFilesCount();
-
- void cleanup(int fileID) throws Exception;
-
- JournalFile getJournalFile(int fileID);
int getIDMapSize();
@@ -70,7 +63,7 @@
/** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
* instead of load(List,List)
*/
- void load(LoadManager reloadManager) throws Exception;
+ long load(LoadManager reloadManager) throws Exception;
void forceMoveNextFile() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -31,7 +31,6 @@
* TODO combine this with JournalFileImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface JournalFile
@@ -40,30 +39,6 @@
void incNegCount(JournalFile file);
- void decNegCount(JournalFile file);
-
- /** Total Negative from other files to this file */
- int getTotalNegCount();
-
- /** Set by the Reclaimer */
- void setTotalNegCount(int total);
-
- /**
- *
- * A list of problematic records that would cause a linked-list effect between two files
- * Information we will need in order to cleanup necessary delete records.
- *
- * To avoid a linked-list effect, we physically remove deleted records from other files, when cleaning up
- * */
- void addCleanupInfo(long id, JournalFile deleteFile);
-
- /**
- * A list of problematic records that would cause a linked-list effect between two files
- * @param id
- * @return The list
- */
- JournalFile getCleanupInfo(long id);
-
int getPosCount();
void incPosCount();
@@ -71,18 +46,7 @@
void decPosCount();
void setCanReclaim(boolean canDelete);
-
- /**
- * Property marking if this file is holding another file from reclaiming because of pending deletes.
- * */
- void setLinkedDependency(boolean hasDependencies);
-
- /**
- * @see JournalFile#setLinkedDependency(boolean)
- * */
- boolean isLinkedDependency();
-
boolean isCanReclaim();
void extendOffset(final int delta);
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -22,16 +22,13 @@
package org.jboss.messaging.core.journal.impl;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.util.Pair;
/**
*
@@ -45,7 +42,7 @@
{
private static final Logger log = Logger.getLogger(JournalFileImpl.class);
- private SequentialFile file;
+ private final SequentialFile file;
private final int orderingID;
@@ -53,18 +50,10 @@
private final AtomicInteger posCount = new AtomicInteger(0);
- private int totalNegCount;
-
private boolean canReclaim;
-
- private boolean linkedDependency;
-
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- // When removing an ID on cleanup, we need to know where the delete is coming from
- private final ConcurrentMap<Long, JournalFile> cleanupIDs = new ConcurrentHashMap<Long, JournalFile>();
-
public JournalFileImpl(final SequentialFile file, final int orderingID)
{
this.file = file;
@@ -72,35 +61,6 @@
this.orderingID = orderingID;
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
- */
- public void addCleanupInfo(final long id, final JournalFile deleteFile)
- {
- cleanupIDs.put(id, deleteFile);
- }
-
- public JournalFile getCleanupInfo(final long id)
- {
- return cleanupIDs.get(id);
- }
-
- /**
- * @return the totalNegCount
- */
- public int getTotalNegCount()
- {
- return totalNegCount;
- }
-
- /**
- * @param totalNegCount the totalNegCount to set
- */
- public void setTotalNegCount(final int totalNegCount)
- {
- this.totalNegCount = totalNegCount;
- }
-
public int getPosCount()
{
return posCount.intValue();
@@ -121,11 +81,6 @@
getOrCreateNegCount(file).incrementAndGet();
}
- public void decNegCount(final JournalFile file)
- {
- getOrCreateNegCount(file).decrementAndGet();
- }
-
public int getNegCount(final JournalFile file)
{
AtomicInteger count = negCounts.get(file);
@@ -175,23 +130,6 @@
return file;
}
-
- /**
- * @return the linkedDependency
- */
- public boolean isLinkedDependency()
- {
- return linkedDependency;
- }
-
- /**
- * @param linkedDependency the linkedDependency to set
- */
- public void setLinkedDependency(boolean linkedDependency)
- {
- this.linkedDependency = linkedDependency;
- }
-
@Override
public String toString()
{
@@ -232,5 +170,4 @@
return count;
}
-
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -45,7 +45,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -90,9 +89,6 @@
private static final int STATE_LOADED = 2;
- // TODO: Should we make this configurable?
- private static final int MAX_LINKED_JOURNAL_FILES = 10;
-
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -161,20 +157,6 @@
public static final byte ROLLBACK_RECORD = 19;
- // Used by cleanup
- public static final byte CLEANED_ADD_RECORD = 20;
-
- // Used by cleanup
- public static final byte CLEANED_ADD_RECORD_TX = 21;
-
- // Used by cleanup
- public static final byte CLEANED_UPDATE_RECORD = 22;
-
- // Used by cleanup
- public static final byte CLEANED_UPDATE_RECORD_TX = 23;
-
- public static final byte LAST_RECORD_ID = CLEANED_UPDATE_RECORD_TX;
-
public static final byte FILL_CHARACTER = (byte)'J';
// Attributes ----------------------------------------------------
@@ -314,8 +296,20 @@
throw new IllegalStateException("Journal must be loaded first");
}
- ByteBufferWrapper bb = generateAddRecord(true, -1, id, recordType, record);
+ int recordLength = record.getEncodeSize();
+ int size = SIZE_ADD_RECORD + recordLength;
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ bb.putByte(ADD_RECORD);
+ bb.putInt(-1); // skip ID part
+ bb.putLong(id);
+ bb.putInt(recordLength);
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -354,8 +348,18 @@
throw new IllegalStateException("Cannot find add info " + id);
}
- ByteBufferWrapper bb = generateUpdateRecord(true, -1, id, recordType, record);
+ int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ bb.putByte(UPDATE_RECORD);
+ bb.putInt(-1); // skip ID part
+ bb.putLong(id);
+ bb.putInt(record.getEncodeSize());
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -402,7 +406,7 @@
{
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
- posFiles.addDelete(id, usedFile);
+ posFiles.addDelete(usedFile);
}
finally
{
@@ -432,9 +436,22 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ int recordLength = record.getEncodeSize();
- ByteBufferWrapper bb = generateAddTransactionalRecord(true, -1, txID, id, recordType, record);
+ int size = SIZE_ADD_RECORD_TX + recordLength;
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ bb.putByte(ADD_RECORD_TX);
+ bb.putInt(-1); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(recordLength);
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -474,8 +491,19 @@
throw new IllegalStateException("Journal must be loaded first");
}
- ByteBufferWrapper bb = generateUpdateRecordTransactional(true, -1, txID, id, recordType, record);
+ int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ bb.putByte(UPDATE_RECORD_TX);
+ bb.putInt(-1); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(record.getEncodeSize());
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -751,221 +779,33 @@
}
- public void cleanup(final JournalFile journalFile) throws Exception
- {
-
- final int fileID = journalFile.getOrderingID();
-
- final SequentialFile sf = journalFile.getFile();
-
- sf.open(maxAIO);
-
- try
- {
- readJournalFile(journalFile, new JournalReader()
- {
-
- public void addRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
- {
- JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
- if (cleanupFile != null)
- {
- if (trace)
- {
- trace("Cleaning addRecord id = " + recordInfo.id);
- }
-
- ByteBufferWrapper buffer = generateAddRecord(false,
- fileID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data));
-
- buffer.rewind();
-
- sf.position(recordPos);
- sf.write(buffer.getBuffer(), false);
-
- // Eliminating the dependency between a and b
-
- cleanupFile.decNegCount(journalFile);
- journalFile.decPosCount();
- }
-
- }
-
- public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
- {
- if (trace)
- {
- trace("Ignored already cleaned TXrecord " + recordInfo.id + ", transactionID = " + transactionID);
- }
- }
-
- public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
- {
- if (trace)
- {
- trace("Ignoring already cleaned record " + recordInfo.id);
- }
- }
-
- public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
- if (cleanupFile != null)
- {
- if (trace)
- {
- trace("Cleaning addRecordTX record id = " + recordInfo.id + " transactionID = " + transactionID);
- }
-
- ByteBufferWrapper bb = generateAddTransactionalRecord(false,
- fileID,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data));
-
- bb.rewind();
-
- sf.position(recordPos);
- sf.write(bb.getBuffer(), false);
-
- // Eliminating the dependency between a and b
-
- cleanupFile.decNegCount(journalFile);
- journalFile.decPosCount();
- }
- }
-
- public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
- {
- }
-
- public void deleteRecord(final int recordPos, final long recordID)
- {
- }
-
- public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
- {
- }
-
- public void markAsDataFile()
- {
- }
-
- public void prepareRecord(final int recordPos,
- final long transactionID,
- final byte[] extraData,
- final byte[] summaryData)
- {
- }
-
- public void rollbackRecord(final int recordPos, final long transactionID)
- {
- }
-
- public void updateRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
- {
- JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
- if (cleanupFile != null)
- {
- if (trace)
- {
- trace("Cleaning updateRecord = " + recordInfo);
- }
-
- ByteBufferWrapper bb = generateUpdateRecord(false,
- fileID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data));
-
- bb.rewind();
-
- sf.position(recordPos);
- sf.write(bb.getBuffer(), false);
-
- // Eliminating the dependency between a and b
-
- cleanupFile.decNegCount(journalFile);
- journalFile.decPosCount();
- }
- }
-
- public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
- {
- JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
- if (cleanupFile != null)
- {
- if (trace)
- {
- trace("Cleaning updateRecord = " + recordInfo);
- }
-
- ByteBufferWrapper bb = generateUpdateRecordTransactional(false,
- fileID,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new ByteArrayEncoding(recordInfo.data));
- bb.rewind();
-
- sf.position(recordPos);
- sf.write(bb.getBuffer(), false);
-
- // Eliminating the dependency between a and b
-
- cleanupFile.decNegCount(journalFile);
- journalFile.decPosCount();
- }
- }
-
- public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
- {
-
- }
-
- public void cleanedUpdateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
- {
- }
-
- });
- }
- finally
- {
- sf.close();
- }
- }
-
/**
* @see JournalImpl#load(LoadManager)
*/
- public synchronized void load(final List<RecordInfo> committedRecords,
+ public synchronized long load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
- load(new LoadManager()
+ long maxID = load(new LoadManager()
{
- public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
}
- public void addRecord(final RecordInfo info)
+ public void addRecord(RecordInfo info)
{
records.add(info);
}
- public void updateRecord(final RecordInfo info)
+ public void updateRecord(RecordInfo info)
{
records.add(info);
}
- public void deleteRecord(final long id)
+ public void deleteRecord(long id)
{
recordsToDelete.add(id);
}
@@ -979,7 +819,7 @@
}
}
- return;
+ return maxID;
}
/**
@@ -1017,124 +857,265 @@
* <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
*
* */
- public synchronized void load(final LoadManager loadManager) throws Exception
+ public synchronized long load(final LoadManager loadManager) throws Exception
{
if (state != STATE_STARTED)
{
throw new IllegalStateException("Journal must be in started state");
}
- final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
- final List<JournalFile> orderedFiles = orderFiles();
+ List<JournalFile> orderedFiles = orderFiles();
int lastDataPos = SIZE_HEADER;
- for (JournalFile loopFile : orderedFiles)
- {
- final AtomicBoolean hasData = new AtomicBoolean(false);
- final JournalFile file = loopFile;
+ long maxID = -1;
+ for (JournalFile file : orderedFiles)
+ {
file.getFile().open(1);
- if (trace)
+ ByteBuffer bb = fileFactory.newBuffer(fileSize);
+
+ int bytesRead = file.getFile().read(bb);
+
+ if (bytesRead != fileSize)
{
- trace("loading file " + file);
+ // FIXME - We should extract everything we can from this file
+ // and then we shouldn't ever reuse this file on reclaiming (instead
+ // reclaim on different size files would aways throw the file away)
+ // rather than throw ISE!
+ // We don't want to leave the user with an unusable system
+ throw new IllegalStateException("File is wrong size " + bytesRead +
+ " expected " +
+ fileSize +
+ " : " +
+ file.getFile().getFileName());
}
- try
+ // First long is the ordering timestamp, we just jump its position
+ bb.position(SIZE_HEADER);
+
+ boolean hasData = false;
+
+ while (bb.hasRemaining())
{
+ final int pos = bb.position();
- // We use an inner method here, as the same method read could be used by both load and cleanup
- // We reuse the same method that will treat the data-format for the journal file on both cases
- final int fileLastPos = readJournalFile(file, new JournalReader()
+ byte recordType = bb.get();
+
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
+ // I - We scan for any valid record on the file. If a hole
+ // happened on the middle of the file we keep looking until all
+ // the possibilities are gone
+ continue;
+ }
- public void addRecord(final int recordPos, final RecordInfo info)
- {
- if (trace)
- {
- trace("AddRecord: " + info);
- }
- loadManager.addRecord(info);
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ // II - Ignore this record, lets keep looking
+ continue;
+ }
- posFilesMap.put(info.id, new PosFiles(file));
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+ int readFileId = bb.getInt();
- hasData.set(true);
+ // IV - This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getOrderingID())
+ {
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ hasData = true;
+ bb.position(pos + 1);
+
+ continue;
+ }
+
+ long transactionID = 0;
+
+ if (isTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
}
- public void updateRecord(final int recordPos, final RecordInfo recordInfo)
+ transactionID = bb.getLong();
+ }
+
+ long recordID = 0;
+
+ if (!isCompleteTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
{
- if (trace)
- {
- trace("UpdateRecord: " + recordInfo);
- }
+ continue;
+ }
- loadManager.updateRecord(recordInfo);
+ recordID = bb.getLong();
- hasData.set(true);
+ maxID = Math.max(maxID, recordID);
+ }
- PosFiles posFiles = posFilesMap.get(recordInfo.id);
+ // We use the size of the record to validate the health of the
+ // record.
+ // (V) We verify the size of the record
- if (posFiles != null)
- {
- // It's legal for this to be null. The file(s) with the insert may
- // have been deleted
- // just leaving some updates in this file
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
- posFiles.addUpdateFile(file);
- }
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
+
+ byte userRecordType = 0;
+
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
}
- public void deleteRecord(final int recordPos, final long recordID)
+ variableSize = bb.getInt();
+
+ if (bb.position() + variableSize > fileSize)
{
- if (trace)
- {
- trace("DeleteRecord " + recordID);
- }
- loadManager.deleteRecord(recordID);
+ log.warn("Record at position " + pos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+ continue;
+ }
- hasData.set(true);
+ if (recordType != DELETE_RECORD_TX)
+ {
+ userRecordType = bb.get();
+ }
- PosFiles posFiles = posFilesMap.remove(recordID);
+ record = new byte[variableSize];
- if (posFiles != null)
- {
- posFiles.addDelete(recordID, file);
- }
+ bb.get(record);
+ }
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+ preparedTransactionExtraDataSize = bb.getInt();
}
+ // Both commit and record contain the recordSummary, and this is
+ // used to calculate the record-size on both record-types
+ variableSize += bb.getInt() * SIZE_INT * 2;
+ }
- public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ int recordSize = getRecordSize(recordType);
+
+ // VI - this is completing V, We will validate the size at the end
+ // of the record,
+ // But we avoid buffer overflows by damaged data
+ if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
+ {
+ // Avoid a buffer overflow caused by damaged data... continue
+ // scanning for more records...
+ log.warn("Record at position " + pos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ hasData = true;
+
+ continue;
+ }
+
+ int oldPos = bb.position();
+
+ bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+ int checkSize = bb.getInt();
+
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the
+ // checkSize by some sort of calculated hash)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ log.warn("Record at position " + pos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ hasData = true;
+
+ bb.position(pos + SIZE_BYTE);
+
+ continue;
+ }
+
+ bb.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+
+ switch (recordType)
+ {
+ case ADD_RECORD:
{
- if (trace)
- {
- trace("cleanedAddRecordTX: " + recordInfo);
- }
+ loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
- JournalTransaction tnp = transactionInfos.get(transactionID);
+ posFilesMap.put(recordID, new PosFiles(file));
- if (tnp == null)
+ hasData = true;
+
+ break;
+ }
+ case UPDATE_RECORD:
+ {
+ loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+
+ hasData = true;
+
+ PosFiles posFiles = posFilesMap.get(recordID);
+
+ if (posFiles != null)
{
- tnp = new JournalTransaction();
+ // It's legal for this to be null. The file(s) with the may
+ // have been deleted
+ // just leaving some updates in this file
- transactionInfos.put(transactionID, tnp);
+ posFiles.addUpdateFile(file);
}
- tnp.addSummaryOnly(file);
+ break;
}
-
- public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ case DELETE_RECORD:
{
+ loadManager.deleteRecord(recordID);
- if (trace)
+ hasData = true;
+
+ PosFiles posFiles = posFilesMap.remove(recordID);
+
+ if (posFiles != null)
{
- trace((recordInfo.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + recordInfo +
- ", txid = " +
- transactionID);
+ posFiles.addDelete(file);
}
+ break;
+ }
+ case ADD_RECORD_TX:
+ case UPDATE_RECORD_TX:
+ {
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1144,7 +1125,7 @@
transactions.put(transactionID, tx);
}
- tx.recordInfos.add(recordInfo);
+ tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
JournalTransaction tnp = transactionInfos.get(transactionID);
@@ -1155,24 +1136,14 @@
transactionInfos.put(transactionID, tnp);
}
- tnp.addPositive(file, recordInfo.id);
+ tnp.addPositive(file, recordID);
- hasData.set(true);
- }
+ hasData = true;
- public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
- {
- // There is no difference here, so using the same method
- addRecordTX(recordPos, transactionID, recordInfo);
+ break;
}
-
- public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ case DELETE_RECORD_TX:
{
- if (trace)
- {
- trace("deleteRecordTX " + recordInfo + ", txid = " + transactionID);
- }
-
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1182,7 +1153,7 @@
transactions.put(transactionID, tx);
}
- tx.recordsToDelete.add(recordInfo);
+ tx.recordsToDelete.add(new RecordInfo(recordID, (byte)0, record, true));
JournalTransaction tnp = transactionInfos.get(transactionID);
@@ -1193,21 +1164,14 @@
transactionInfos.put(transactionID, tnp);
}
- tnp.addNegative(file, recordInfo.id);
+ tnp.addNegative(file, recordID);
- hasData.set(true);
- }
+ hasData = true;
- public void prepareRecord(final int recordPos,
- final long transactionID,
- final byte[] extraData,
- final byte[] summaryData)
+ break;
+ }
+ case PREPARE_RECORD:
{
- if (trace)
- {
- trace("prepareRecordTX: txid = " + transactionID);
- }
-
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1218,9 +1182,12 @@
transactions.put(transactionID, tx);
}
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ bb.get(extraData);
+
// Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
- ByteBuffer.wrap(summaryData));
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
tx.prepared = true;
@@ -1243,28 +1210,22 @@
}
else
{
- log.warn("Prepared transaction " + transactionID +
- " wasn't considered completed, it will be ignored");
+ log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
tx.invalid = true;
}
- hasData.set(true);
- }
+ hasData = true;
- public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
+ break;
+ }
+ case COMMIT_RECORD:
{
- if (trace)
- {
- trace("commitRecord: txid = " + transactionID);
- }
-
TransactionHolder tx = transactions.remove(transactionID);
// We need to read it even if transaction was not found, or
// the reading checks would fail
// Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
- ByteBuffer.wrap(summaryData));
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
@@ -1313,18 +1274,13 @@
journalTransaction.forget();
}
- hasData.set(true);
+ hasData = true;
}
+ break;
}
-
- public void rollbackRecord(final int recordPos, final long transactionID)
+ case ROLLBACK_RECORD:
{
- if (trace)
- {
- trace("rollbackRecord: txid = " + transactionID);
- }
-
TransactionHolder tx = transactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
@@ -1344,79 +1300,43 @@
// Rollbacks.. We will ignore the data anyway.
tnp.rollback(file);
- hasData.set(true);
+ hasData = true;
}
+ break;
}
-
- public void markAsDataFile()
+ default:
{
- if (trace)
- {
- trace("markAsDataFile");
- }
-
- hasData.set(true);
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " +
+ recordType);
}
+ }
- public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
- {
- if (trace)
- {
- trace("cleanedAddRecord: " + recordInfo);
- }
+ checkSize = bb.getInt();
- }
-
- public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
- {
- if (trace)
- {
- trace("cleanedUpdateRecord: " + recordInfo);
- }
- }
-
- public void cleanedUpdateRecordTX(final int recordPos,
- final long transactionID,
- final RecordInfo recordInfo)
- {
- if (trace)
- {
- trace("cleanedUpdateRecordTx: " + recordInfo + ", txID = " + transactionID);
- }
-
- JournalTransaction tnp = transactionInfos.get(transactionID);
-
- if (tnp == null)
- {
- tnp = new JournalTransaction();
-
- transactionInfos.put(transactionID, tnp);
- }
-
- tnp.addSummaryOnly(file);
-
- }
-
- });
-
- if (hasData.get())
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- lastDataPos = fileLastPos;
- dataFiles.add(file);
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
}
- else
- {
- // Empty dataFiles with no data
- freeFiles.add(file);
- }
+ lastDataPos = bb.position();
}
- finally
+
+ file.getFile().close();
+
+ if (hasData)
{
- file.getFile().close();
+ dataFiles.add(file);
}
-
+ else
+ {
+ // Empty dataFiles with no data
+ freeFiles.add(file);
+ }
}
// Create any more files we need
@@ -1504,7 +1424,7 @@
checkAndReclaimFiles();
- return;
+ return maxID;
}
public int getAlignment() throws Exception
@@ -1512,31 +1432,9 @@
return fileFactory.getAlignment();
}
- // TestableJournal implementation --------------------------------------------------------------
+ // TestableJournal implementation
+ // --------------------------------------------------------------
- public JournalFile getJournalFile(final int fileID)
- {
- for (JournalFile file : dataFiles)
- {
- if (file.getOrderingID() == fileID)
- {
- return file;
- }
- }
-
- return null;
- }
-
- public void cleanup(final int fileID) throws Exception
- {
- JournalFile file = getJournalFile(fileID);
-
- if (file != null)
- {
- cleanup(file);
- }
- }
-
public void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
@@ -1558,12 +1456,8 @@
builder.append("DataFile:" + file +
" posCounter = " +
file.getPosCount() +
- " totalNegative = " +
- file.getTotalNegCount() +
" reclaimStatus = " +
file.isCanReclaim() +
- " linkedDependency = " +
- file.isLinkedDependency() +
"\n");
if (file instanceof JournalFileImpl)
{
@@ -1624,19 +1518,10 @@
}
- public int checkAndReclaimFiles() throws Exception
+ public void checkAndReclaimFiles() throws Exception
{
- Set<JournalFile> empty = Collections.emptySet();
- return checkAndReclaimFiles(empty, false);
- }
+ checkReclaimStatus();
- /**
- * When we cleanup a file, we reschedule a check, but we can't cleanup the same file again, or we may get on an infinite loop
- * */
- private int checkAndReclaimFiles(final Set<JournalFile> cleanedFiles, final boolean performCleanup) throws Exception
- {
- int secondCriterionCount = checkReclaimStatus();
-
for (JournalFile file : dataFiles)
{
if (file.isCanReclaim())
@@ -1667,71 +1552,6 @@
}
}
}
-
- if (performCleanup)
- {
- if (secondCriterionCount > MAX_LINKED_JOURNAL_FILES)
- {
- JournalFile cleanupFile = null;
-
- // Will look for the first file that has many dependencies.
- // This is because a single file could be holding all the records
- for (JournalFile file : dataFiles)
- {
- if (file.isLinkedDependency())
- {
- if (cleanedFiles.contains(file))
- {
- if (trace)
- {
- trace("File " + file + " was already cleaned, getting next one");
- }
-
- // However in some exceptional cases, the file could still have a dependency after cleaned (commits on different files for instance)
- // Because of that, on the subsequent scheduled cleanups (if any), we need to ensure we don't reprocess the same file or we would
- // be in an infinite loop
- continue;
- }
-
- cleanupFile = file;
- break;
- }
- }
-
- if (cleanupFile != null)
- {
- log.info("System has too many linked files on deletes(" + secondCriterionCount +
- "), performing a cleanup on " + cleanupFile);
-
- if (trace)
- {
- trace("Cleaning up " + cleanupFile);
- }
-
- cleanedFiles.add(cleanupFile);
- cleanup(cleanupFile);
-
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- checkAndReclaimFiles(cleanedFiles, true);
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- });
-
- }
-
- }
- }
-
- return secondCriterionCount;
}
public int getDataFilesCount()
@@ -1819,7 +1639,7 @@
public synchronized void stop() throws Exception
{
trace("Stopping the journal");
-
+
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
@@ -1864,168 +1684,17 @@
}
}
- // Public --------------------------------------------------------------------------------
+ // Public
+ // -----------------------------------------------------------------------------
- // Private -------------------------------------------------------------------------------
+ // Private
+ // -----------------------------------------------------------------------------
-
- // Private Methods that encapsulate data format generation -------------------------------
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @return
- */
- private ByteBufferWrapper generateAddRecord(final boolean addRecord,
- final int fileID,
- final long id,
- final byte userRecordType,
- final EncodingSupport record)
+ private void checkReclaimStatus() throws Exception
{
- int recordLength = record.getEncodeSize();
-
- int size = SIZE_ADD_RECORD + recordLength;
-
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- if (addRecord)
- {
- bb.putByte(ADD_RECORD);
- }
- else
- {
- bb.putByte(CLEANED_ADD_RECORD);
- }
-
- bb.putInt(fileID);
- bb.putLong(id);
- bb.putInt(recordLength);
- bb.putByte(userRecordType);
- record.encode(bb);
- bb.putInt(size);
-
- return bb;
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @return
- */
- private ByteBufferWrapper generateUpdateRecord(final boolean isUpdateRecord,
- final int fileID,
- final long id,
- final byte recordType,
- final EncodingSupport record)
- {
- int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
-
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- if (isUpdateRecord)
- {
- bb.putByte(UPDATE_RECORD);
- }
- else
- {
- bb.putByte(CLEANED_UPDATE_RECORD);
- }
- bb.putInt(fileID); // skip ID part
- bb.putLong(id);
- bb.putInt(record.getEncodeSize());
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
- return bb;
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @return
- */
- private ByteBufferWrapper generateAddTransactionalRecord(final boolean addRecord,
- final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record)
- {
- int recordLength = record.getEncodeSize();
-
- int size = SIZE_ADD_RECORD_TX + recordLength;
-
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- if (addRecord)
- {
- bb.putByte(ADD_RECORD_TX);
- }
- else
- {
- bb.putByte(CLEANED_ADD_RECORD_TX);
- }
- bb.putInt(fileID); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(recordLength);
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
- return bb;
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @return
- */
- private ByteBufferWrapper generateUpdateRecordTransactional(final boolean isUpdateRecord,
- final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record)
- {
- int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
-
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- if (isUpdateRecord)
- {
- bb.putByte(UPDATE_RECORD_TX);
- }
- else
- {
- bb.putByte(CLEANED_UPDATE_RECORD_TX);
- }
-
- bb.putInt(fileID); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(record.getEncodeSize());
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
- return bb;
- }
-
- // -------------------------------------------------------------------------------------------------------
-
- /**
- * @return the number of linkedFiles
- */
- private int checkReclaimStatus() throws Exception
- {
JournalFile[] files = new JournalFile[dataFiles.size()];
- return reclaimer.scan(dataFiles.toArray(files));
+ reclaimer.scan(dataFiles.toArray(files));
}
// Discard the old JournalFile and set it with a new ID
@@ -2033,17 +1702,6 @@
{
int newOrderingID = generateOrderingID();
- return reinitializeFile(file, newOrderingID);
- }
-
- /**
- * @param file
- * @param newOrderingID
- * @return
- * @throws Exception
- */
- private JournalFile reinitializeFile(final JournalFile file, final int newOrderingID) throws Exception
- {
SequentialFile sf = file.getFile();
sf.open(1);
@@ -2213,8 +1871,6 @@
{
return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
recordType == DELETE_RECORD_TX ||
- recordType == CLEANED_ADD_RECORD_TX ||
- recordType == CLEANED_UPDATE_RECORD_TX ||
isCompleteTransaction(recordType);
}
@@ -2225,9 +1881,7 @@
private boolean isContainsBody(final byte recordType)
{
- return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX ||
- recordType >= CLEANED_ADD_RECORD &&
- recordType <= CLEANED_UPDATE_RECORD_TX;
+ return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX;
}
private int getRecordSize(final byte recordType)
@@ -2237,19 +1891,15 @@
switch (recordType)
{
case ADD_RECORD:
- case CLEANED_ADD_RECORD:
recordSize = SIZE_ADD_RECORD;
break;
case UPDATE_RECORD:
- case CLEANED_UPDATE_RECORD:
recordSize = SIZE_UPDATE_RECORD;
break;
case ADD_RECORD_TX:
- case CLEANED_ADD_RECORD_TX:
recordSize = SIZE_ADD_RECORD_TX;
break;
case UPDATE_RECORD_TX:
- case CLEANED_UPDATE_RECORD_TX:
recordSize = SIZE_UPDATE_RECORD_TX;
break;
case DELETE_RECORD:
@@ -2512,7 +2162,7 @@
{
try
{
- checkAndReclaimFiles(new HashSet<JournalFile>(), true);
+ checkAndReclaimFiles();
}
catch (Exception e)
{
@@ -2542,17 +2192,6 @@
* */
private void pushOpenedFile() throws Exception
{
- JournalFile nextOpenedFile = openNewFile();
-
- openedFiles.offer(nextOpenedFile);
- }
-
- /**
- * @return
- * @throws Exception
- */
- private JournalFile openNewFile() throws Exception
- {
JournalFile nextOpenedFile = null;
try
{
@@ -2570,7 +2209,8 @@
{
openFile(nextOpenedFile);
}
- return nextOpenedFile;
+
+ openedFiles.offer(nextOpenedFile);
}
private void closeFile(final JournalFile file)
@@ -2644,310 +2284,6 @@
}
}
- /**
- * This method encapsulates the Journal file reading, used by both compact and cleanup.
- *
- * Instead of duplicating the method of reading the file, we have this method that could be used by both loading a cleanup.
- *
- * @param file
- * @param reader
- * @return true if it has damaged data
- * @throws Exception
- */
- private int readJournalFile(final JournalFile file, final JournalReader reader) throws Exception
- {
- ByteBuffer bb = fileFactory.newBuffer(fileSize);
-
- file.getFile().read(bb);
-
- int lastDataPos = SIZE_HEADER;
-
- // First long is the ordering timestamp, we just jump its position
- bb.position(SIZE_HEADER);
-
- while (bb.hasRemaining())
- {
- final int recordPos = bb.position();
-
- byte recordType = bb.get();
-
- if (recordType < ADD_RECORD || recordType > LAST_RECORD_ID)
- {
- // I - We scan for any valid record on the file. If a hole
- // happened on the middle of the file we keep looking until all
- // the possibilities are gone
- continue;
- }
-
- if (bb.position() + SIZE_INT > fileSize)
- {
- // II - Ignore this record, lets keep looking
- continue;
- }
-
- // III - Every record has the file-id.
- // This is what supports us from not re-filling the whole file
- int readFileId = bb.getInt();
-
- // IV - This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
- {
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile();
-
- bb.position(recordPos + 1);
-
- continue;
- }
-
- long transactionID = 0;
-
- if (isTransaction(recordType))
- {
- if (bb.position() + SIZE_LONG > fileSize)
- {
- continue;
- }
-
- transactionID = bb.getLong();
- }
-
- long recordID = 0;
-
- if (!isCompleteTransaction(recordType))
- {
- if (bb.position() + SIZE_LONG > fileSize)
- {
- continue;
- }
-
- recordID = bb.getLong();
- }
-
- // We use the size of the record to validate the health of the
- // record.
- // (V) We verify the size of the record
-
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
-
- // Used to hold extra data on transaction prepares
- int preparedTransactionExtraDataSize = 0;
-
- byte userRecordType = 0;
-
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
- if (bb.position() + SIZE_INT > fileSize)
- {
- continue;
- }
-
- variableSize = bb.getInt();
-
- if (bb.position() + variableSize > fileSize)
- {
- log.warn("Record at position " + recordPos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
- continue;
- }
-
- if (recordType != DELETE_RECORD_TX)
- {
- userRecordType = bb.get();
- }
-
- record = new byte[variableSize];
-
- bb.get(record);
- }
-
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- if (recordType == PREPARE_RECORD)
- {
- // Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = bb.getInt();
- }
- // Both commit and record contain the recordSummary, and this is
- // used to calculate the record-size on both record-types
- variableSize += bb.getInt() * SIZE_INT * 2;
- }
-
- int recordSize = getRecordSize(recordType);
-
- // VI - this is completing V, We will validate the size at the end
- // of the record,
- // But we avoid buffer overflows by damaged data
- if (recordPos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
- {
- // Avoid a buffer overflow caused by damaged data... continue
- // scanning for more records...
- log.warn("Record at position " + recordPos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile();
-
- continue;
- }
-
- int oldPos = bb.position();
-
- bb.position(recordPos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
-
- int checkSize = bb.getInt();
-
- // VII - The checkSize at the end has to match with the size
- // informed at the beggining.
- // This is like testing a hash for the record. (We could replace the
- // checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- log.warn("Record at position " + recordPos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
-
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile();
-
- bb.position(recordPos + SIZE_BYTE);
-
- continue;
- }
-
- bb.position(oldPos);
-
- // At this point everything is checked. So we relax and just load
- // the data now.
-
- switch (recordType)
- {
- case ADD_RECORD:
- {
- reader.addRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
- case CLEANED_ADD_RECORD:
- {
- reader.cleanedAddRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
- case UPDATE_RECORD:
- {
- reader.updateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
-
- break;
- }
- case CLEANED_UPDATE_RECORD:
- {
- reader.cleanedUpdateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
-
- break;
- }
- case DELETE_RECORD:
- {
- reader.deleteRecord(recordPos, recordID);
-
- break;
- }
-
- case CLEANED_ADD_RECORD_TX:
- {
- reader.cleanedAddRecordTX(recordPos, transactionID, new RecordInfo(recordID,
- userRecordType,
- record,
- false));
- break;
- }
-
- case ADD_RECORD_TX:
- {
- reader.addRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
- case UPDATE_RECORD_TX:
- {
- reader.updateRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, true));
- break;
- }
- case CLEANED_UPDATE_RECORD_TX:
- {
- reader.cleanedUpdateRecordTX(recordPos, transactionID, new RecordInfo(recordID,
- userRecordType,
- record,
- true));
- break;
- }
- case DELETE_RECORD_TX:
- {
- reader.deleteRecordTX(recordPos, transactionID, new RecordInfo(recordID, (byte)0, record, true));
- break;
- }
- case PREPARE_RECORD:
- {
- byte extraData[] = new byte[preparedTransactionExtraDataSize];
-
- bb.get(extraData);
-
- byte[] summaryData = new byte[variableSize];
- bb.get(summaryData);
-
- reader.prepareRecord(recordPos, transactionID, extraData, summaryData);
-
- break;
- }
- case COMMIT_RECORD:
- {
-
- byte[] summaryData = new byte[variableSize];
- bb.get(summaryData);
-
- reader.commitRecord(recordPos, transactionID, summaryData);
-
- break;
- }
- case ROLLBACK_RECORD:
- {
- reader.rollbackRecord(recordPos, transactionID);
- break;
- }
- default:
- {
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " +
- recordType);
- }
-
- }
-
- checkSize = bb.getInt();
-
- // This is a sanity check about the loading code itself.
- // If this checkSize doesn't match, it means the reading method is
- // not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
- }
-
- lastDataPos = bb.position();
- }
-
- return lastDataPos;
-
- }
-
public ByteBuffer newBuffer(final int size)
{
return buffersControl.newBuffer(size);
@@ -3023,38 +2359,18 @@
updateFile.incPosCount();
}
- void addDelete(final long id, final JournalFile deleteFile)
+ void addDelete(final JournalFile file)
{
- if (addFile != deleteFile)
- {
- addFile.addCleanupInfo(id, deleteFile);
- }
+ file.incNegCount(addFile);
- deleteFile.incNegCount(addFile);
-
if (updateFiles != null)
{
- for (JournalFile updateF : updateFiles)
+ for (JournalFile jf : updateFiles)
{
- if (addFile != updateF)
- {
-
- // cleanup dependency between updateFile and deleteFile
-
- // Say you have this scenario: (A=Add, U=Update, D=Delete)
- // File1: A1
- // File2: U1
- // File3: D1
-
- // I need to cleanup the counter between D1 and A1, and the counter between D1 and U1
- updateF.addCleanupInfo(id, deleteFile);
- }
-
- deleteFile.incNegCount(updateF);
+ file.incNegCount(jf);
}
}
}
-
}
/** Class that will control buffer-reuse */
@@ -3153,12 +2469,6 @@
return numberOfElementsPerFile;
}
- // Used after cleanup records
- public void addSummaryOnly(final JournalFile file)
- {
- getCounter(file).incrementAndGet();
- }
-
public void addPositive(final JournalFile file, final long id)
{
getCounter(file).incrementAndGet();
@@ -3216,7 +2526,7 @@
if (posFiles != null)
{
- posFiles.addDelete(n.b, n.a);
+ posFiles.addDelete(n.a);
}
}
}
@@ -3298,7 +2608,8 @@
}
}
-
+
+
private class ByteArrayEncoding implements EncodingSupport
{
@@ -3343,95 +2654,5 @@
}
- /**
- *
- *
- * Abstraction used to read journal files.
- *
- *
- */
- private static interface JournalReader
- {
- void addRecord(int recordPos, RecordInfo info) throws Exception;
- /**
- * @param recordPos
- * @param transactionID
- * @param recordInfo
- */
- void cleanedUpdateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordInfo
- */
- void cleanedUpdateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordPos
- * @param transactionID
- * @param recordInfo
- */
- void cleanedAddRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordPos
- * @param recordInfo
- */
- void cleanedAddRecord(int recordPos, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordInfo
- * @throws Exception
- */
- void updateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordID
- */
- void deleteRecord(int recordPos, long recordID) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- * @throws Exception
- */
- void addRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- * @throws Exception
- */
- void updateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- */
- void deleteRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param extraData
- * @param summaryData
- */
- void prepareRecord(int recordPos, long transactionID, byte[] extraData, byte[] summaryData) throws Exception;
-
- /**
- * @param transactionID
- * @param summaryData
- */
- void commitRecord(int recordPos, long transactionID, byte[] summaryData) throws Exception;
-
- /**
- * @param transactionID
- */
- void rollbackRecord(int recordPos, long transactionID) throws Exception;
-
- /**
- *
- */
- void markAsDataFile();
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -44,7 +44,6 @@
* <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming">http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming</a></p>
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class Reclaimer
@@ -58,19 +57,14 @@
log.trace(message);
}
- /** Returns the number of files holding second criterion (linked-list effect) */
- public int scan(final JournalFile[] files)
+ public void scan(final JournalFile[] files)
{
- int secondCriterionCount = 0;
-
for (int i = 0; i < files.length; i++)
{
- // First we evaluate criterion 1) (Which is simple reference counting)
+ // First we evaluate criterion 1)
JournalFile currentFile = files[i];
- currentFile.setLinkedDependency(false);
-
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -94,14 +88,10 @@
}
currentFile.setCanReclaim(true);
-
- // This attribute would be helpful on calculating % usage of a file..
- // And it is also useful for debugging
- currentFile.setTotalNegCount(totNeg);
if (posCount <= totNeg)
{
- // Now we evaluate criterion 2) (This file shouldn't have delete records on non reclaimable files)
+ // Now we evaluate criterion 2)
for (int j = 0; j <= i; j++)
{
@@ -124,11 +114,6 @@
currentFile.setCanReclaim(false);
- // This file is holding currentFile from being reclaimed, hence we set it as a linked dependency
- file.setLinkedDependency(true);
-
- secondCriterionCount++;
-
break;
}
}
@@ -139,7 +124,5 @@
currentFile.setCanReclaim(false);
}
}
-
- return secondCriterionCount;
}
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -1,230 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.journal;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.xa.BasicXaTest;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A JournalCleanupIntegrationTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Feb 13, 2009 6:55:48 PM
- *
- *
- */
-public class JournalCleanupIntegrationTest extends ServiceTestBase
-{
- private static Logger log = Logger.getLogger(BasicXaTest.class);
-
- private final Map<String, AddressSettings> addressSettings = new HashMap<String, AddressSettings>();
-
- private MessagingService messagingService;
-
- private ClientSessionFactory sessionFactory;
-
- private Configuration configuration;
-
- private final SimpleString a1 = new SimpleString("a1");
-
- private final SimpleString a2 = new SimpleString("a2");
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- clearData();
- addressSettings.clear();
- configuration = createDefaultConfig();
- configuration.setSecurityEnabled(false);
- configuration.setJournalMinFiles(2);
- configuration.setPagingDirectory(getPageDir());
-
- messagingService = createService(true, configuration, addressSettings);
-
- // start the server
- messagingService.start();
-
- sessionFactory = createInVMFactory();
- ClientSession clientSession;
- clientSession = sessionFactory.createSession(true, false, false);
-
- clientSession.createQueue(a1, a1, null, true, true);
- clientSession.createQueue(a2, a2, null, true, true);
- clientSession.close();
- }
-
- protected void tearDown() throws Exception
- {
-
- if (messagingService != null && messagingService.isStarted())
- {
- messagingService.stop();
- }
-
- super.tearDown();
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testAutoCleanup() throws Exception
- {
- ClientSession clientSession = sessionFactory.createSession(false, true, true);
-
- ClientSession sessionConsumer = sessionFactory.createSession(null, null, false, true, true, false, 0);
-
- int NUMBER_OF_MESSAGES = 6000;
-
- CountDownLatch latch = new CountDownLatch(NUMBER_OF_MESSAGES);
-
- try
- {
- ClientProducer prod = clientSession.createProducer(a1);
- prod.send(createTextMessage(clientSession, "hello"));
- prod.close();
-
- ClientConsumer cons = sessionConsumer.createConsumer(a2);
-
- sessionConsumer.start();
-
- LocalHandler handler = new LocalHandler(latch);
-
- cons.setMessageHandler(handler);
-
- prod = clientSession.createProducer(a2);
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- prod.send(createBytesMessage(clientSession, new byte[1024], true));
- }
-
- latch.await(10, TimeUnit.SECONDS);
-
- if (handler.e != null)
- {
- throw handler.e;
- }
-
- JournalStorageManager storage = (JournalStorageManager)messagingService.getServer().getStorageManager();
- JournalImpl journal = (JournalImpl)storage.getMessageJournal();
-
- // The cleanup is asynchronous, we keep trying the condition until a timeout of 15 seconds
- for (long timeout = System.currentTimeMillis() + 15000; timeout > System.currentTimeMillis();)
- {
- // Wait the current task to finish before we test the condition again
- journal.debugWait();
-
- if (journal.getDataFilesCount() <= 5)
- {
- break;
- }
- }
-
- assertTrue("DataFilesCount supposed to be less than 5, but it was " + journal.getDataFilesCount(), journal.getDataFilesCount() <= 5);
-
-
- cons.close();
- cons = sessionConsumer.createConsumer(a1);
-
- ClientMessage mess = cons.receive(1000);
-
- assertNotNull(mess);
- mess.acknowledge();
- sessionConsumer.commit();
-
- journal.forceMoveNextFile();
- journal.checkAndReclaimFiles();
-
- assertEquals(0, journal.getDataFilesCount());
- }
- finally
- {
- clientSession.close();
- sessionConsumer.close();
- }
-
- }
-
- class LocalHandler implements MessageHandler
- {
- Exception e;
-
- CountDownLatch latch;
-
- LocalHandler(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- message.acknowledge();
- latch.countDown();
- }
- catch (MessagingException e)
- {
- this.e = e;
- }
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -3018,552 +3018,6 @@
assertEquals(0, journal.getDataFilesCount());
}
- public void testCleanupNonTransactional() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- add(1, 2, 3, 4, 5, 6, 7, 8, 9);
- add(50);
- delete(50);
- System.out.println("Data = " + journal.getDataFilesCount());
-
- journal.forceMoveNextFile();
-
- update(1, 2, 3, 4, 5, 6, 7, 8);
-
- journal.forceMoveNextFile();
-
- System.out.println("Data = " + journal.getDataFilesCount());
-
- delete(1, 2, 3, 4, 5, 6, 7, 8);
-
- add(10, 11, 12, 13, 14, 15);
- System.out.println("Data = " + journal.getDataFilesCount());
-
- journal.forceMoveNextFile();
-
- System.out.println("Data = " + journal.getDataFilesCount());
- delete(10, 11, 12, 13, 14, 15);
-
- System.out.println("Data = " + journal.getDataFilesCount());
- journal.checkAndReclaimFiles();
- System.out.println("Data = " + journal.getDataFilesCount());
-
- System.out.println("Before ********************************************");
- System.out.println(journal.debug());
-
- journal.forceMoveNextFile();
-
- System.out.println("After ********************************************");
-
- System.out.println("Journal: " + journal.debug());
-
- journal.checkAndReclaimFiles();
- System.out.println("Data = " + journal.getDataFilesCount());
-
- log.debug("Debug on Journal before stopJournal - \n" + debugJournal());
-
- journal.cleanup(1);
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(0, journal.getDataFilesCount());
-
- add(99);
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(0, journal.getDataFilesCount());
-
- delete(99);
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(0, journal.getDataFilesCount());
-
- }
-
- public void testCleanupTransactional() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- // File 1
- {
- addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- System.out.println("Data = " + journal.getDataFilesCount());
-
- // 9 positives
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 2
- {
- addTx(1, 10, 11, 12, 13, 14, 15);
- commit(1);
- add(16);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 2: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 3
- {
- delete(16);
- update(1, 2, 3, 4, 5, 6, 7, 8);
- delete(10, 11, 12, 13, 14, 15);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 3: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // file 4
- {
- delete(1, 2, 3, 4, 5, 6, 7, 8);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 4: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.cleanup(1);
-
- System.out.println("After Cleanup 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.cleanup(2);
-
-
- journal.checkAndReclaimFiles();
- System.out.println("After Reclaim on Cleanup 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- System.out.println("After reload ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // assertEquals(2, journal.getDataFilesCount());
-
- delete(9);
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- System.out.println("journal = " + journal.getDataFilesCount());
-
- add(100);
- update(100);
- delete(100);
-
- journal.forceMoveNextFile();
-
- System.out.println("After add-update-delete on same file ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.checkAndReclaimFiles();
-
- System.out.println("Final ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- }
-
- public void testCleanupWithDeleteUpdatesDifferentFiles() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- // File 1
- {
- addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- commit(1);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 2
- {
- updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- commit(2);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 2: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 3
- {
- delete(1, 2, 3, 4, 5, 6, 7, 8);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("Holding for secondLevel = " + journal.checkAndReclaimFiles());
-
- assertTrue(journal.getJournalFile(1).isLinkedDependency());
- assertFalse(journal.getJournalFile(2).isLinkedDependency());
- assertFalse(journal.getJournalFile(3).isLinkedDependency());
-
- System.out.println("File 3: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.cleanup(1);
-
- System.out.println("After Cleanup 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- journal.checkAndReclaimFiles();
-
- delete(9);
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- System.out.println("Final ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- public void testCleanupWithoutDeleteUpdatesDifferentFiles() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- // File 1
- {
- addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- commit(1);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 2
- {
- updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- commit(2);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 2: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 3
- {
- delete(1, 2, 3, 4, 5, 6, 7, 8);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 3: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.cleanup(1);
-
- System.out.println("After Cleanup 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- journal.cleanup(2);
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- System.out.println("Final ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(1, journal.getDataFilesCount());
-
- delete(9);
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- assertEquals(0, journal.getDataFilesCount());
-
- System.out.println("data:" + journal.getDataFilesCount());
-
- System.out.println("After Delete ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- }
-
- public void testCleanupWithUpdatesSameFiles() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- // File 1
- {
- addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- commit(1);
- update(1, 2, 3, 4, 5, 6, 7, 8);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- // File 2
- {
- delete(1, 2, 3, 4, 5, 6, 7, 8);
- }
-
- journal.forceMoveNextFile();
-
- System.out.println("File 2: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.cleanup(1);
-
- System.out.println("After Cleanup 1: ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- journal.forceMoveNextFile();
- add(20);
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- System.out.println("Final ********************************************");
- System.out.println(journal.debug());
- System.out.println("***************************************************");
-
- }
-
- public void testCleanupWholeFile() throws Exception
- {
- setup(2, 20 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- add(50);
- delete(50);
-
- addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
- System.out.println("Data = " + journal.getDataFilesCount());
-
- journal.forceMoveNextFile();
-
- addTx(1, 10, 11, 12, 13, 14, 15);
-
- commit(1);
-
- journal.forceMoveNextFile();
-
- delete(1, 2, 3, 4, 5, 6, 7, 8, 9);
-
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- assertEquals(1, journal.getDataFilesCount());
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- delete(10, 11, 12, 13, 14, 15);
- journal.checkAndReclaimFiles();
- assertEquals(0, journal.getDataFilesCount());
-
- System.out.println("Second one **********************************");
- System.out.println(journal.debug());
-
- assertEquals(0, journal.getDataFilesCount());
-
- }
-
- public void testAutomaticCleanup() throws Exception
- {
- setup(2, 3 * 1024, true);
-
- createJournal();
- startJournal();
- load();
-
- add(50);
- delete(50);
-
- addTx(1, 1, 2);
- journal.forceMoveNextFile();
- updateTx(1, 1, 2);
- journal.forceMoveNextFile();
- commit(1);
- System.out.println("Data = " + journal.getDataFilesCount());
-
- journal.forceMoveNextFile();
-
- delete(1);
-
- for (int i = 10; i < 20; i++)
- {
- add(i);
- journal.forceMoveNextFile();
- update(i);
- journal.forceMoveNextFile();
- delete(i);
- }
-
- journal.forceMoveNextFile();
-
- add(100);
- update(100);
-
- for (int i = 101; i < 120; i++)
- {
- add(i);
- journal.forceMoveNextFile();
- update(i);
- journal.forceMoveNextFile();
- delete(i);
- }
-
- journal.checkAndReclaimFiles();
-
- journal.setAutoReclaim(true);
- journal.forceMoveNextFile();
-
-
- // The cleanup is asynchronous, we keep trying the condition until a timeout of 5 seconds
- for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis();)
- {
- // Wait the current task to finish before we test the condition again
- journal.debugWait();
-
- if (journal.getDataFilesCount() == 4)
- {
- break;
- }
- }
-
- assertEquals(4, journal.getDataFilesCount());
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- delete(2, 100);
-
- // moving to the next file, so any deletes will already make way on reclaiming
- journal.forceMoveNextFile();
-
- journal.checkAndReclaimFiles();
-
- assertEquals(0, journal.getDataFilesCount());
-
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(0, journal.getDataFilesCount());
-
- }
-
protected abstract int getAlignment();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-02-17 14:26:25 UTC (rev 5880)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-02-17 15:59:21 UTC (rev 5881)
@@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,7 +32,6 @@
import org.jboss.messaging.core.journal.impl.Reclaimer;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.Pair;
/**
*
@@ -750,8 +748,6 @@
private boolean canDelete;
- private boolean linkedDependency;
-
public void extendOffset(final int delta)
{
}
@@ -798,18 +794,6 @@
negCounts.put(file, c);
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#decNegCount(org.jboss.messaging.core.journal.impl.JournalFile)
- */
- public void decNegCount(JournalFile file)
- {
- Integer count = negCounts.get(file);
-
- int c = count == null ? 1 : count.intValue() - 1;
-
- negCounts.put(file, c);
- }
-
public int getPosCount()
{
return posCount;
@@ -879,54 +863,5 @@
{
return transactionIDs;
}
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#getTotalNegCount()
- */
- public int getTotalNegCount()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#setTotalNegCount(int)
- */
- public void setTotalNegCount(int total)
- {
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
- */
- public void addCleanupInfo(long id, JournalFile deleteFile)
- {
- }
-
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.journal.impl.JournalFile#getCleanupInfo(long)
- */
- public JournalFile getCleanupInfo(long id)
- {
- return null;
- }
-
- /**
- * @return the linkedDependency
- */
- public boolean isLinkedDependency()
- {
- return linkedDependency;
- }
-
- /**
- * @param linkedDependency the linkedDependency to set
- */
- public void setLinkedDependency(boolean linkedDependency)
- {
- this.linkedDependency = linkedDependency;
- }
-
-
-
}
}
More information about the jboss-cvs-commits
mailing list