[jboss-cvs] JBoss Messaging SVN: r5864 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 13 20:49:45 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-02-13 20:49:45 -0500 (Fri, 13 Feb 2009)
New Revision: 5864
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/FakeJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1427 - Journal Cleanup implementation
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -83,7 +83,7 @@
// Load
- long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+ void load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
int getAlignment() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -69,5 +69,10 @@
return r.id == id;
}
+
+ public String toString()
+ {
+ return ("RecordInfo (id=" + id + ", userRecordType = " + userRecordType + ", data.length = " + data.length + ", isUpdate = " + this.isUpdate);
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -22,6 +22,9 @@
package org.jboss.messaging.core.journal;
+import org.jboss.messaging.core.journal.impl.JournalFile;
+
+
/**
*
* A TestableJournal
@@ -32,13 +35,17 @@
*/
public interface TestableJournal extends Journal
{
- void checkAndReclaimFiles() throws Exception;
+ int checkAndReclaimFiles() throws Exception;
int getDataFilesCount();
int getFreeFilesCount();
int getOpenedFilesCount();
+
+ void cleanup(int fileID) throws Exception;
+
+ JournalFile getJournalFile(int fileID);
int getIDMapSize();
@@ -63,7 +70,7 @@
/** This method could be promoted to {@link Journal} interface when we decide to use the loadManager
* instead of load(List,List)
*/
- long load(LoadManager reloadManager) throws Exception;
+ void load(LoadManager reloadManager) throws Exception;
void forceMoveNextFile() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -31,6 +31,7 @@
* TODO combine this with JournalFileImpl
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface JournalFile
@@ -39,6 +40,30 @@
void incNegCount(JournalFile file);
+ void decNegCount(JournalFile file);
+
+ /** Total Negative from other files to this file */
+ int getTotalNegCount();
+
+ /** Set by the Reclaimer */
+ void setTotalNegCount(int total);
+
+ /**
+ *
+ * A list of problematic records that would cause a linked-list effect between two files
+ * Information we will need in order to cleanup necessary delete records.
+ *
+ * To avoid a linked-list effect, we physically remove deleted records from other files, when cleaning up
+ * */
+ void addCleanupInfo(long id, JournalFile deleteFile);
+
+ /**
+ * A list of problematic records that would cause a linked-list effect between two files
+ * @param id
+ * @return The list
+ */
+ JournalFile getCleanupInfo(long id);
+
int getPosCount();
void incPosCount();
@@ -46,7 +71,18 @@
void decPosCount();
void setCanReclaim(boolean canDelete);
+
+ /**
+ * Property marking if this file is holding another file from reclaiming because of pending deletes.
+ * */
+ void setLinkedDependency(boolean hasDependencies);
+
+ /**
+ * @see JournalFile#setLinkedDependency(boolean)
+ * */
+ boolean isLinkedDependency();
+
boolean isCanReclaim();
void extendOffset(final int delta);
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -22,13 +22,16 @@
package org.jboss.messaging.core.journal.impl;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.Pair;
/**
*
@@ -42,7 +45,7 @@
{
private static final Logger log = Logger.getLogger(JournalFileImpl.class);
- private final SequentialFile file;
+ private SequentialFile file;
private final int orderingID;
@@ -50,10 +53,18 @@
private final AtomicInteger posCount = new AtomicInteger(0);
+ private int totalNegCount;
+
private boolean canReclaim;
+
+ private boolean linkedDependency;
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
+ // When removing an ID on cleanup, we need to know where the delete is coming from
+ private final ConcurrentMap<Long, JournalFile> cleanupIDs = new ConcurrentHashMap<Long, JournalFile>();
+
public JournalFileImpl(final SequentialFile file, final int orderingID)
{
this.file = file;
@@ -61,6 +72,35 @@
this.orderingID = orderingID;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
+ */
+ public void addCleanupInfo(final long id, final JournalFile deleteFile)
+ {
+ cleanupIDs.put(id, deleteFile);
+ }
+
+ public JournalFile getCleanupInfo(final long id)
+ {
+ return cleanupIDs.get(id);
+ }
+
+ /**
+ * @return the totalNegCount
+ */
+ public int getTotalNegCount()
+ {
+ return totalNegCount;
+ }
+
+ /**
+ * @param totalNegCount the totalNegCount to set
+ */
+ public void setTotalNegCount(final int totalNegCount)
+ {
+ this.totalNegCount = totalNegCount;
+ }
+
public int getPosCount()
{
return posCount.intValue();
@@ -81,6 +121,11 @@
getOrCreateNegCount(file).incrementAndGet();
}
+ public void decNegCount(final JournalFile file)
+ {
+ getOrCreateNegCount(file).decrementAndGet();
+ }
+
public int getNegCount(final JournalFile file)
{
AtomicInteger count = negCounts.get(file);
@@ -130,6 +175,23 @@
return file;
}
+
+ /**
+ * @return the linkedDependency
+ */
+ public boolean isLinkedDependency()
+ {
+ return linkedDependency;
+ }
+
+ /**
+ * @param linkedDependency the linkedDependency to set
+ */
+ public void setLinkedDependency(boolean linkedDependency)
+ {
+ this.linkedDependency = linkedDependency;
+ }
+
@Override
public String toString()
{
@@ -170,4 +232,5 @@
return count;
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -45,6 +45,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -82,6 +83,11 @@
public class JournalImpl implements TestableJournal
{
+ /**
+ *
+ */
+ private static final int MAX_LINKED_JOURNAL_FILES = 10;
+
// Constants -----------------------------------------------------
private static final int STATE_STOPPED = 0;
@@ -157,6 +163,20 @@
public static final byte ROLLBACK_RECORD = 19;
+ // Used by cleanup
+ public static final byte CLEANED_ADD_RECORD = 20;
+
+ // Used by cleanup
+ public static final byte CLEANED_ADD_RECORD_TX = 21;
+
+ // Used by cleanup
+ public static final byte CLEANED_UPDATE_RECORD = 22;
+
+ // Used by cleanup
+ public static final byte CLEANED_UPDATE_RECORD_TX = 23;
+
+ public static final byte LAST_RECORD_ID = CLEANED_UPDATE_RECORD_TX;
+
public static final byte FILL_CHARACTER = (byte)'J';
// Attributes ----------------------------------------------------
@@ -296,20 +316,8 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int recordLength = record.getEncodeSize();
+ ByteBufferWrapper bb = generateAddRecord(true, -1, id, recordType, record);
- int size = SIZE_ADD_RECORD + recordLength;
-
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- bb.putByte(ADD_RECORD);
- bb.putInt(-1); // skip ID part
- bb.putLong(id);
- bb.putInt(recordLength);
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
-
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -348,18 +356,8 @@
throw new IllegalStateException("Cannot find add info " + id);
}
- int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+ ByteBufferWrapper bb = generateUpdateRecord(true, -1, id, recordType, record);
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- bb.putByte(UPDATE_RECORD);
- bb.putInt(-1); // skip ID part
- bb.putLong(id);
- bb.putInt(record.getEncodeSize());
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
-
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -406,7 +404,7 @@
{
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
- posFiles.addDelete(usedFile);
+ posFiles.addDelete(id, usedFile);
}
finally
{
@@ -436,22 +434,9 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
- int recordLength = record.getEncodeSize();
- int size = SIZE_ADD_RECORD_TX + recordLength;
+ ByteBufferWrapper bb = generateAddTransactionalRecord(true, -1, txID, id, recordType, record);
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- bb.putByte(ADD_RECORD_TX);
- bb.putInt(-1); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(recordLength);
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
-
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -491,19 +476,8 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+ ByteBufferWrapper bb = generateUpdateRecordTransactional(true, -1, txID, id, recordType, record);
- ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
-
- bb.putByte(UPDATE_RECORD_TX);
- bb.putInt(-1); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(record.getEncodeSize());
- bb.putByte(recordType);
- record.encode(bb);
- bb.putInt(size);
-
try
{
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
@@ -779,33 +753,221 @@
}
+ public void cleanup(final JournalFile journalFile) throws Exception
+ {
+
+ final int fileID = journalFile.getOrderingID();
+
+ final SequentialFile sf = journalFile.getFile();
+
+ sf.open(maxAIO);
+
+ try
+ {
+ readJournalFile(journalFile, new JournalReader()
+ {
+
+ public void addRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
+ {
+ JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+ if (cleanupFile != null)
+ {
+ if (trace)
+ {
+ trace("Cleaning addRecord id = " + recordInfo.id);
+ }
+
+ ByteBufferWrapper buffer = generateAddRecord(false,
+ fileID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new ByteArrayEncoding(recordInfo.data));
+
+ buffer.rewind();
+
+ sf.position(recordPos);
+ sf.write(buffer.getBuffer(), false);
+
+ // Eliminating the dependency between a and b
+
+ cleanupFile.decNegCount(journalFile);
+ journalFile.decPosCount();
+ }
+
+ }
+
+ public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ {
+ if (trace)
+ {
+ trace("Ignored already cleaned TXrecord " + recordInfo.id + ", transactionID = " + transactionID);
+ }
+ }
+
+ public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
+ {
+ if (trace)
+ {
+ trace("Ignoring already cleaned record " + recordInfo.id);
+ }
+ }
+
+ public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+ if (cleanupFile != null)
+ {
+ if (trace)
+ {
+ trace("Cleaning addRecordTX record id = " + recordInfo.id + " transactionID = " + transactionID);
+ }
+
+ ByteBufferWrapper bb = generateAddTransactionalRecord(false,
+ fileID,
+ transactionID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new ByteArrayEncoding(recordInfo.data));
+
+ bb.rewind();
+
+ sf.position(recordPos);
+ sf.write(bb.getBuffer(), false);
+
+ // Eliminating the dependency between a and b
+
+ cleanupFile.decNegCount(journalFile);
+ journalFile.decPosCount();
+ }
+ }
+
+ public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
+ {
+ }
+
+ public void deleteRecord(final int recordPos, final long recordID)
+ {
+ }
+
+ public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ {
+ }
+
+ public void markAsDataFile()
+ {
+ }
+
+ public void prepareRecord(final int recordPos,
+ final long transactionID,
+ final byte[] extraData,
+ final byte[] summaryData)
+ {
+ }
+
+ public void rollbackRecord(final int recordPos, final long transactionID)
+ {
+ }
+
+ public void updateRecord(final int recordPos, final RecordInfo recordInfo) throws Exception
+ {
+ JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+ if (cleanupFile != null)
+ {
+ if (trace)
+ {
+ trace("Cleaning updateRecord = " + recordInfo);
+ }
+
+ ByteBufferWrapper bb = generateUpdateRecord(false,
+ fileID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new ByteArrayEncoding(recordInfo.data));
+
+ bb.rewind();
+
+ sf.position(recordPos);
+ sf.write(bb.getBuffer(), false);
+
+ // Eliminating the dependency between a and b
+
+ cleanupFile.decNegCount(journalFile);
+ journalFile.decPosCount();
+ }
+ }
+
+ public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ JournalFile cleanupFile = journalFile.getCleanupInfo(recordInfo.id);
+ if (cleanupFile != null)
+ {
+ if (trace)
+ {
+ trace("Cleaning updateRecord = " + recordInfo);
+ }
+
+ ByteBufferWrapper bb = generateUpdateRecordTransactional(false,
+ fileID,
+ transactionID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new ByteArrayEncoding(recordInfo.data));
+ bb.rewind();
+
+ sf.position(recordPos);
+ sf.write(bb.getBuffer(), false);
+
+ // Eliminating the dependency between a and b
+
+ cleanupFile.decNegCount(journalFile);
+ journalFile.decPosCount();
+ }
+ }
+
+ public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
+ {
+
+ }
+
+ public void cleanedUpdateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ {
+ }
+
+ });
+ }
+ finally
+ {
+ sf.close();
+ }
+ }
+
/**
* @see JournalImpl#load(LoadManager)
*/
- public synchronized long load(final List<RecordInfo> committedRecords,
+ public synchronized void load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
- long maxID = load(new LoadManager()
+ load(new LoadManager()
{
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
}
- public void addRecord(RecordInfo info)
+ public void addRecord(final RecordInfo info)
{
records.add(info);
}
- public void updateRecord(RecordInfo info)
+ public void updateRecord(final RecordInfo info)
{
records.add(info);
}
- public void deleteRecord(long id)
+ public void deleteRecord(final long id)
{
recordsToDelete.add(id);
}
@@ -819,7 +981,7 @@
}
}
- return maxID;
+ return;
}
/**
@@ -857,265 +1019,124 @@
* <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p>
*
* */
- public synchronized long load(final LoadManager loadManager) throws Exception
+ public synchronized void load(final LoadManager loadManager) throws Exception
{
if (state != STATE_STARTED)
{
throw new IllegalStateException("Journal must be in started state");
}
- Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
- List<JournalFile> orderedFiles = orderFiles();
+ final List<JournalFile> orderedFiles = orderFiles();
int lastDataPos = SIZE_HEADER;
- long maxID = -1;
+ for (JournalFile loopFile : orderedFiles)
+ {
+ final AtomicBoolean hasData = new AtomicBoolean(false);
+ final JournalFile file = loopFile;
- for (JournalFile file : orderedFiles)
- {
file.getFile().open(1);
- ByteBuffer bb = fileFactory.newBuffer(fileSize);
-
- int bytesRead = file.getFile().read(bb);
-
- if (bytesRead != fileSize)
+ if (trace)
{
- // FIXME - We should extract everything we can from this file
- // and then we shouldn't ever reuse this file on reclaiming (instead
- // reclaim on different size files would aways throw the file away)
- // rather than throw ISE!
- // We don't want to leave the user with an unusable system
- throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " +
- fileSize +
- " : " +
- file.getFile().getFileName());
+ trace("loading file " + file);
}
- // First long is the ordering timestamp, we just jump its position
- bb.position(SIZE_HEADER);
-
- boolean hasData = false;
-
- while (bb.hasRemaining())
+ try
{
- final int pos = bb.position();
- byte recordType = bb.get();
-
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ // We use an inner method here, as the same method read could be used by both load and cleanup
+ // We reuse the same method that will treat the data-format for the journal file on both cases
+ final int fileLastPos = readJournalFile(file, new JournalReader()
{
- // I - We scan for any valid record on the file. If a hole
- // happened on the middle of the file we keep looking until all
- // the possibilities are gone
- continue;
- }
- if (bb.position() + SIZE_INT > fileSize)
- {
- // II - Ignore this record, lets keep looking
- continue;
- }
+ public void addRecord(final int recordPos, final RecordInfo info)
+ {
+ if (trace)
+ {
+ trace("AddRecord: " + info);
+ }
+ loadManager.addRecord(info);
- // III - Every record has the file-id.
- // This is what supports us from not re-filling the whole file
- int readFileId = bb.getInt();
+ posFilesMap.put(info.id, new PosFiles(file));
- // IV - This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
- {
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
+ hasData.set(true);
- bb.position(pos + 1);
-
- continue;
- }
-
- long transactionID = 0;
-
- if (isTransaction(recordType))
- {
- if (bb.position() + SIZE_LONG > fileSize)
- {
- continue;
}
- transactionID = bb.getLong();
- }
-
- long recordID = 0;
-
- if (!isCompleteTransaction(recordType))
- {
- if (bb.position() + SIZE_LONG > fileSize)
+ public void updateRecord(final int recordPos, final RecordInfo recordInfo)
{
- continue;
- }
+ if (trace)
+ {
+ trace("UpdateRecord: " + recordInfo);
+ }
- recordID = bb.getLong();
+ loadManager.updateRecord(recordInfo);
- maxID = Math.max(maxID, recordID);
- }
+ hasData.set(true);
- // We use the size of the record to validate the health of the
- // record.
- // (V) We verify the size of the record
+ PosFiles posFiles = posFilesMap.get(recordInfo.id);
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
+ if (posFiles != null)
+ {
+ // It's legal for this to be null. The file(s) with the insert may
+ // have been deleted
+ // just leaving some updates in this file
- // Used to hold extra data on transaction prepares
- int preparedTransactionExtraDataSize = 0;
-
- byte userRecordType = 0;
-
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
- if (bb.position() + SIZE_INT > fileSize)
- {
- continue;
+ posFiles.addUpdateFile(file);
+ }
}
- variableSize = bb.getInt();
-
- if (bb.position() + variableSize > fileSize)
+ public void deleteRecord(final int recordPos, final long recordID)
{
- log.warn("Record at position " + pos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
- continue;
- }
+ if (trace)
+ {
+ trace("DeleteRecord " + recordID);
+ }
+ loadManager.deleteRecord(recordID);
- if (recordType != DELETE_RECORD_TX)
- {
- userRecordType = bb.get();
- }
+ hasData.set(true);
- record = new byte[variableSize];
+ PosFiles posFiles = posFilesMap.remove(recordID);
- bb.get(record);
- }
+ if (posFiles != null)
+ {
+ posFiles.addDelete(recordID, file);
+ }
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- if (recordType == PREPARE_RECORD)
- {
- // Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = bb.getInt();
}
- // Both commit and record contain the recordSummary, and this is
- // used to calculate the record-size on both record-types
- variableSize += bb.getInt() * SIZE_INT * 2;
- }
- int recordSize = getRecordSize(recordType);
-
- // VI - this is completing V, We will validate the size at the end
- // of the record,
- // But we avoid buffer overflows by damaged data
- if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
- {
- // Avoid a buffer overflow caused by damaged data... continue
- // scanning for more records...
- log.warn("Record at position " + pos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
-
- continue;
- }
-
- int oldPos = bb.position();
-
- bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
-
- int checkSize = bb.getInt();
-
- // VII - The checkSize at the end has to match with the size
- // informed at the beggining.
- // This is like testing a hash for the record. (We could replace the
- // checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- log.warn("Record at position " + pos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
-
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
-
- bb.position(pos + SIZE_BYTE);
-
- continue;
- }
-
- bb.position(oldPos);
-
- // At this point everything is checked. So we relax and just load
- // the data now.
-
- switch (recordType)
- {
- case ADD_RECORD:
+ public void cleanedAddRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
{
- loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+ if (trace)
+ {
+ trace("cleanedAddRecordTX: " + recordInfo);
+ }
- posFilesMap.put(recordID, new PosFiles(file));
+ JournalTransaction tnp = transactionInfos.get(transactionID);
- hasData = true;
-
- break;
- }
- case UPDATE_RECORD:
- {
- loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
-
- hasData = true;
-
- PosFiles posFiles = posFilesMap.get(recordID);
-
- if (posFiles != null)
+ if (tnp == null)
{
- // It's legal for this to be null. The file(s) with the may
- // have been deleted
- // just leaving some updates in this file
+ tnp = new JournalTransaction();
- posFiles.addUpdateFile(file);
+ transactionInfos.put(transactionID, tnp);
}
- break;
+ tnp.addSummaryOnly(file);
}
- case DELETE_RECORD:
+
+ public void addRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
{
- loadManager.deleteRecord(recordID);
- hasData = true;
-
- PosFiles posFiles = posFilesMap.remove(recordID);
-
- if (posFiles != null)
+ if (trace)
{
- posFiles.addDelete(file);
+ trace((recordInfo.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + recordInfo +
+ ", txid = " +
+ transactionID);
}
- break;
- }
- case ADD_RECORD_TX:
- case UPDATE_RECORD_TX:
- {
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1125,7 +1146,7 @@
transactions.put(transactionID, tx);
}
- tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
+ tx.recordInfos.add(recordInfo);
JournalTransaction tnp = transactionInfos.get(transactionID);
@@ -1136,14 +1157,24 @@
transactionInfos.put(transactionID, tnp);
}
- tnp.addPositive(file, recordID);
+ tnp.addPositive(file, recordInfo.id);
- hasData = true;
+ hasData.set(true);
+ }
- break;
+ public void updateRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
+ {
+ // There is no difference here, so using the same method
+ addRecordTX(recordPos, transactionID, recordInfo);
}
- case DELETE_RECORD_TX:
+
+ public void deleteRecordTX(final int recordPos, final long transactionID, final RecordInfo recordInfo)
{
+ if (trace)
+ {
+ trace("deleteRecordTX " + recordInfo + ", txid = " + transactionID);
+ }
+
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1153,7 +1184,7 @@
transactions.put(transactionID, tx);
}
- tx.recordsToDelete.add(new RecordInfo(recordID, (byte)0, record, true));
+ tx.recordsToDelete.add(recordInfo);
JournalTransaction tnp = transactionInfos.get(transactionID);
@@ -1164,14 +1195,21 @@
transactionInfos.put(transactionID, tnp);
}
- tnp.addNegative(file, recordID);
+ tnp.addNegative(file, recordInfo.id);
- hasData = true;
+ hasData.set(true);
+ }
- break;
- }
- case PREPARE_RECORD:
+ public void prepareRecord(final int recordPos,
+ final long transactionID,
+ final byte[] extraData,
+ final byte[] summaryData)
{
+ if (trace)
+ {
+ trace("prepareRecordTX: txid = " + transactionID);
+ }
+
TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
@@ -1182,12 +1220,9 @@
transactions.put(transactionID, tx);
}
- byte extraData[] = new byte[preparedTransactionExtraDataSize];
-
- bb.get(extraData);
-
// Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
+ ByteBuffer.wrap(summaryData));
tx.prepared = true;
@@ -1210,22 +1245,28 @@
}
else
{
- log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
+ log.warn("Prepared transaction " + transactionID +
+ " wasn't considered completed, it will be ignored");
tx.invalid = true;
}
- hasData = true;
+ hasData.set(true);
+ }
- break;
- }
- case COMMIT_RECORD:
+ public void commitRecord(final int recordPos, final long transactionID, final byte[] summaryData)
{
+ if (trace)
+ {
+ trace("commitRecord: txid = " + transactionID);
+ }
+
TransactionHolder tx = transactions.remove(transactionID);
// We need to read it even if transaction was not found, or
// the reading checks would fail
// Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(summaryData.length,
+ ByteBuffer.wrap(summaryData));
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
@@ -1274,13 +1315,18 @@
journalTransaction.forget();
}
- hasData = true;
+ hasData.set(true);
}
- break;
}
- case ROLLBACK_RECORD:
+
+ public void rollbackRecord(final int recordPos, final long transactionID)
{
+ if (trace)
+ {
+ trace("rollbackRecord: txid = " + transactionID);
+ }
+
TransactionHolder tx = transactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
@@ -1300,43 +1346,79 @@
// Rollbacks.. We will ignore the data anyway.
tnp.rollback(file);
- hasData = true;
+ hasData.set(true);
}
- break;
}
- default:
+
+ public void markAsDataFile()
{
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " +
- recordType);
+ if (trace)
+ {
+ trace("markAsDataFile");
+ }
+
+ hasData.set(true);
}
- }
- checkSize = bb.getInt();
+ public void cleanedAddRecord(final int recordPos, final RecordInfo recordInfo)
+ {
+ if (trace)
+ {
+ trace("cleanedAddRecord: " + recordInfo);
+ }
- // This is a sanity check about the loading code itself.
- // If this checkSize doesn't match, it means the reading method is
- // not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ }
+
+ public void cleanedUpdateRecord(final int recordPos, final RecordInfo recordInfo)
+ {
+ if (trace)
+ {
+ trace("cleanedUpdateRecord: " + recordInfo);
+ }
+ }
+
+ public void cleanedUpdateRecordTX(final int recordPos,
+ final long transactionID,
+ final RecordInfo recordInfo)
+ {
+ if (trace)
+ {
+ trace("cleanedUpdateRecordTx: " + recordInfo + ", txID = " + transactionID);
+ }
+
+ JournalTransaction tnp = transactionInfos.get(transactionID);
+
+ if (tnp == null)
+ {
+ tnp = new JournalTransaction();
+
+ transactionInfos.put(transactionID, tnp);
+ }
+
+ tnp.addSummaryOnly(file);
+
+ }
+
+ });
+
+ if (hasData.get())
{
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ lastDataPos = fileLastPos;
+ dataFiles.add(file);
}
+ else
+ {
+ // Empty dataFiles with no data
+ freeFiles.add(file);
+ }
- lastDataPos = bb.position();
}
-
- file.getFile().close();
-
- if (hasData)
+ finally
{
- dataFiles.add(file);
+ file.getFile().close();
}
- else
- {
- // Empty dataFiles with no data
- freeFiles.add(file);
- }
+
}
// Create any more files we need
@@ -1424,7 +1506,7 @@
checkAndReclaimFiles();
- return maxID;
+ return;
}
public int getAlignment() throws Exception
@@ -1432,9 +1514,31 @@
return fileFactory.getAlignment();
}
- // TestableJournal implementation
- // --------------------------------------------------------------
+ // TestableJournal implementation --------------------------------------------------------------
+ public JournalFile getJournalFile(final int fileID)
+ {
+ for (JournalFile file : dataFiles)
+ {
+ if (file.getOrderingID() == fileID)
+ {
+ return file;
+ }
+ }
+
+ return null;
+ }
+
+ public void cleanup(final int fileID) throws Exception
+ {
+ JournalFile file = getJournalFile(fileID);
+
+ if (file != null)
+ {
+ cleanup(file);
+ }
+ }
+
public void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
@@ -1456,8 +1560,12 @@
builder.append("DataFile:" + file +
" posCounter = " +
file.getPosCount() +
+ " totalNegative = " +
+ file.getTotalNegCount() +
" reclaimStatus = " +
file.isCanReclaim() +
+ " linkedDependency = " +
+ file.isLinkedDependency() +
"\n");
if (file instanceof JournalFileImpl)
{
@@ -1518,10 +1626,19 @@
}
- public void checkAndReclaimFiles() throws Exception
+ public int checkAndReclaimFiles() throws Exception
{
- checkReclaimStatus();
+ Set<JournalFile> empty = Collections.emptySet();
+ return checkAndReclaimFiles(empty, false);
+ }
+ /**
+ * When we cleanup a file, we reschedule a check, but we can't cleanup the same file again, or we may get on an infinite loop
+ * */
+ private int checkAndReclaimFiles(final Set<JournalFile> cleanedFiles, final boolean performCleanup) throws Exception
+ {
+ int secondCriterionCount = checkReclaimStatus();
+
for (JournalFile file : dataFiles)
{
if (file.isCanReclaim())
@@ -1552,6 +1669,71 @@
}
}
}
+
+ if (performCleanup)
+ {
+ if (secondCriterionCount > MAX_LINKED_JOURNAL_FILES)
+ {
+ JournalFile cleanupFile = null;
+
+ // Will look for the first file that has many dependencies.
+ // This is because a single file could be holding all the records
+ for (JournalFile file : dataFiles)
+ {
+ if (file.isLinkedDependency())
+ {
+ if (cleanedFiles.contains(file))
+ {
+ if (trace)
+ {
+ trace("File " + file + " was already cleaned, getting next one");
+ }
+
+ // However in some exceptional cases, the file could still have a dependency after cleaned (commits on different files for instance)
+ // Because of that, on the subsequent scheduled cleanups (if any), we need to ensure we don't reprocess the same file or we would
+ // be in an infinite loop
+ continue;
+ }
+
+ cleanupFile = file;
+ break;
+ }
+ }
+
+ if (cleanupFile != null)
+ {
+ log.info("System has too many linked files on deletes(" + secondCriterionCount +
+ "), performing a cleanup on " + cleanupFile);
+
+ if (trace)
+ {
+ trace("Cleaning up " + cleanupFile);
+ }
+
+ cleanedFiles.add(cleanupFile);
+ cleanup(cleanupFile);
+
+ filesExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ checkAndReclaimFiles(cleanedFiles, true);
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
+
+ }
+
+ }
+ }
+
+ return secondCriterionCount;
}
public int getDataFilesCount()
@@ -1639,7 +1821,7 @@
public synchronized void stop() throws Exception
{
trace("Stopping the journal");
-
+
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
@@ -1684,17 +1866,168 @@
}
}
- // Public
- // -----------------------------------------------------------------------------
+ // Public --------------------------------------------------------------------------------
- // Private
- // -----------------------------------------------------------------------------
+ // Private -------------------------------------------------------------------------------
- private void checkReclaimStatus() throws Exception
+
+ // Private Methods that encapsulate data format generation -------------------------------
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @return
+ */
+ private ByteBufferWrapper generateAddRecord(final boolean addRecord,
+ final int fileID,
+ final long id,
+ final byte userRecordType,
+ final EncodingSupport record)
{
+ int recordLength = record.getEncodeSize();
+
+ int size = SIZE_ADD_RECORD + recordLength;
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ if (addRecord)
+ {
+ bb.putByte(ADD_RECORD);
+ }
+ else
+ {
+ bb.putByte(CLEANED_ADD_RECORD);
+ }
+
+ bb.putInt(fileID);
+ bb.putLong(id);
+ bb.putInt(recordLength);
+ bb.putByte(userRecordType);
+ record.encode(bb);
+ bb.putInt(size);
+
+ return bb;
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @return
+ */
+ private ByteBufferWrapper generateUpdateRecord(final boolean isUpdateRecord,
+ final int fileID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record)
+ {
+ int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ if (isUpdateRecord)
+ {
+ bb.putByte(UPDATE_RECORD);
+ }
+ else
+ {
+ bb.putByte(CLEANED_UPDATE_RECORD);
+ }
+ bb.putInt(fileID); // skip ID part
+ bb.putLong(id);
+ bb.putInt(record.getEncodeSize());
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+ return bb;
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @return
+ */
+ private ByteBufferWrapper generateAddTransactionalRecord(final boolean addRecord,
+ final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record)
+ {
+ int recordLength = record.getEncodeSize();
+
+ int size = SIZE_ADD_RECORD_TX + recordLength;
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ if (addRecord)
+ {
+ bb.putByte(ADD_RECORD_TX);
+ }
+ else
+ {
+ bb.putByte(CLEANED_ADD_RECORD_TX);
+ }
+ bb.putInt(fileID); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(recordLength);
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+ return bb;
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @return
+ */
+ private ByteBufferWrapper generateUpdateRecordTransactional(final boolean isUpdateRecord,
+ final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record)
+ {
+ int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
+
+ if (isUpdateRecord)
+ {
+ bb.putByte(UPDATE_RECORD_TX);
+ }
+ else
+ {
+ bb.putByte(CLEANED_UPDATE_RECORD_TX);
+ }
+
+ bb.putInt(fileID); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(record.getEncodeSize());
+ bb.putByte(recordType);
+ record.encode(bb);
+ bb.putInt(size);
+ return bb;
+ }
+
+ // -------------------------------------------------------------------------------------------------------
+
+ /**
+ * @return the number of linkedFiles
+ */
+ private int checkReclaimStatus() throws Exception
+ {
JournalFile[] files = new JournalFile[dataFiles.size()];
- reclaimer.scan(dataFiles.toArray(files));
+ return reclaimer.scan(dataFiles.toArray(files));
}
// Discard the old JournalFile and set it with a new ID
@@ -1702,6 +2035,17 @@
{
int newOrderingID = generateOrderingID();
+ return reinitializeFile(file, newOrderingID);
+ }
+
+ /**
+ * @param file
+ * @param newOrderingID
+ * @return
+ * @throws Exception
+ */
+ private JournalFile reinitializeFile(final JournalFile file, final int newOrderingID) throws Exception
+ {
SequentialFile sf = file.getFile();
sf.open(1);
@@ -1871,6 +2215,8 @@
{
return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
recordType == DELETE_RECORD_TX ||
+ recordType == CLEANED_ADD_RECORD_TX ||
+ recordType == CLEANED_UPDATE_RECORD_TX ||
isCompleteTransaction(recordType);
}
@@ -1881,7 +2227,9 @@
private boolean isContainsBody(final byte recordType)
{
- return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX;
+ return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX ||
+ recordType >= CLEANED_ADD_RECORD &&
+ recordType <= CLEANED_UPDATE_RECORD_TX;
}
private int getRecordSize(final byte recordType)
@@ -1891,15 +2239,19 @@
switch (recordType)
{
case ADD_RECORD:
+ case CLEANED_ADD_RECORD:
recordSize = SIZE_ADD_RECORD;
break;
case UPDATE_RECORD:
+ case CLEANED_UPDATE_RECORD:
recordSize = SIZE_UPDATE_RECORD;
break;
case ADD_RECORD_TX:
+ case CLEANED_ADD_RECORD_TX:
recordSize = SIZE_ADD_RECORD_TX;
break;
case UPDATE_RECORD_TX:
+ case CLEANED_UPDATE_RECORD_TX:
recordSize = SIZE_UPDATE_RECORD_TX;
break;
case DELETE_RECORD:
@@ -2162,7 +2514,7 @@
{
try
{
- checkAndReclaimFiles();
+ checkAndReclaimFiles(new HashSet<JournalFile>(), true);
}
catch (Exception e)
{
@@ -2192,6 +2544,17 @@
* */
private void pushOpenedFile() throws Exception
{
+ JournalFile nextOpenedFile = openNewFile();
+
+ openedFiles.offer(nextOpenedFile);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JournalFile openNewFile() throws Exception
+ {
JournalFile nextOpenedFile = null;
try
{
@@ -2209,8 +2572,7 @@
{
openFile(nextOpenedFile);
}
-
- openedFiles.offer(nextOpenedFile);
+ return nextOpenedFile;
}
private void closeFile(final JournalFile file)
@@ -2284,6 +2646,310 @@
}
}
+ /**
+ * This method encapsulates the Journal file reading, used by both compact and cleanup.
+ *
+ * Instead of duplicating the method of reading the file, we have this method that could be used by both loading a cleanup.
+ *
+ * @param file
+ * @param reader
+ * @return true if it has damaged data
+ * @throws Exception
+ */
+ private int readJournalFile(final JournalFile file, final JournalReader reader) throws Exception
+ {
+ ByteBuffer bb = fileFactory.newBuffer(fileSize);
+
+ file.getFile().read(bb);
+
+ int lastDataPos = SIZE_HEADER;
+
+ // First long is the ordering timestamp, we just jump its position
+ bb.position(SIZE_HEADER);
+
+ while (bb.hasRemaining())
+ {
+ final int recordPos = bb.position();
+
+ byte recordType = bb.get();
+
+ if (recordType < ADD_RECORD || recordType > LAST_RECORD_ID)
+ {
+ // I - We scan for any valid record on the file. If a hole
+ // happened on the middle of the file we keep looking until all
+ // the possibilities are gone
+ continue;
+ }
+
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ // II - Ignore this record, lets keep looking
+ continue;
+ }
+
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+ int readFileId = bb.getInt();
+
+ // IV - This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getOrderingID())
+ {
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile();
+
+ bb.position(recordPos + 1);
+
+ continue;
+ }
+
+ long transactionID = 0;
+
+ if (isTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+
+ transactionID = bb.getLong();
+ }
+
+ long recordID = 0;
+
+ if (!isCompleteTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+
+ recordID = bb.getLong();
+ }
+
+ // We use the size of the record to validate the health of the
+ // record.
+ // (V) We verify the size of the record
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
+
+ byte userRecordType = 0;
+
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
+ }
+
+ variableSize = bb.getInt();
+
+ if (bb.position() + variableSize > fileSize)
+ {
+ log.warn("Record at position " + recordPos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+ continue;
+ }
+
+ if (recordType != DELETE_RECORD_TX)
+ {
+ userRecordType = bb.get();
+ }
+
+ record = new byte[variableSize];
+
+ bb.get(record);
+ }
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+ preparedTransactionExtraDataSize = bb.getInt();
+ }
+ // Both commit and record contain the recordSummary, and this is
+ // used to calculate the record-size on both record-types
+ variableSize += bb.getInt() * SIZE_INT * 2;
+ }
+
+ int recordSize = getRecordSize(recordType);
+
+ // VI - this is completing V, We will validate the size at the end
+ // of the record,
+ // But we avoid buffer overflows by damaged data
+ if (recordPos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
+ {
+ // Avoid a buffer overflow caused by damaged data... continue
+ // scanning for more records...
+ log.warn("Record at position " + recordPos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile();
+
+ continue;
+ }
+
+ int oldPos = bb.position();
+
+ bb.position(recordPos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+ int checkSize = bb.getInt();
+
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the
+ // checkSize by some sort of calculated hash)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ log.warn("Record at position " + recordPos +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored");
+
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile();
+
+ bb.position(recordPos + SIZE_BYTE);
+
+ continue;
+ }
+
+ bb.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+
+ switch (recordType)
+ {
+ case ADD_RECORD:
+ {
+ reader.addRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+ case CLEANED_ADD_RECORD:
+ {
+ reader.cleanedAddRecord(recordPos, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+ case UPDATE_RECORD:
+ {
+ reader.updateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
+
+ break;
+ }
+ case CLEANED_UPDATE_RECORD:
+ {
+ reader.cleanedUpdateRecord(recordPos, new RecordInfo(recordID, userRecordType, record, true));
+
+ break;
+ }
+ case DELETE_RECORD:
+ {
+ reader.deleteRecord(recordPos, recordID);
+
+ break;
+ }
+
+ case CLEANED_ADD_RECORD_TX:
+ {
+ reader.cleanedAddRecordTX(recordPos, transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ false));
+ break;
+ }
+
+ case ADD_RECORD_TX:
+ {
+ reader.addRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+ case UPDATE_RECORD_TX:
+ {
+ reader.updateRecordTX(recordPos, transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
+ case CLEANED_UPDATE_RECORD_TX:
+ {
+ reader.cleanedUpdateRecordTX(recordPos, transactionID, new RecordInfo(recordID,
+ userRecordType,
+ record,
+ true));
+ break;
+ }
+ case DELETE_RECORD_TX:
+ {
+ reader.deleteRecordTX(recordPos, transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ break;
+ }
+ case PREPARE_RECORD:
+ {
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ bb.get(extraData);
+
+ byte[] summaryData = new byte[variableSize];
+ bb.get(summaryData);
+
+ reader.prepareRecord(recordPos, transactionID, extraData, summaryData);
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+
+ byte[] summaryData = new byte[variableSize];
+ bb.get(summaryData);
+
+ reader.commitRecord(recordPos, transactionID, summaryData);
+
+ break;
+ }
+ case ROLLBACK_RECORD:
+ {
+ reader.rollbackRecord(recordPos, transactionID);
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " +
+ recordType);
+ }
+
+ }
+
+ checkSize = bb.getInt();
+
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ }
+
+ lastDataPos = bb.position();
+ }
+
+ return lastDataPos;
+
+ }
+
public ByteBuffer newBuffer(final int size)
{
return buffersControl.newBuffer(size);
@@ -2359,18 +3025,38 @@
updateFile.incPosCount();
}
- void addDelete(final JournalFile file)
+ void addDelete(final long id, final JournalFile deleteFile)
{
- file.incNegCount(addFile);
+ if (addFile != deleteFile)
+ {
+ addFile.addCleanupInfo(id, deleteFile);
+ }
+ deleteFile.incNegCount(addFile);
+
if (updateFiles != null)
{
- for (JournalFile jf : updateFiles)
+ for (JournalFile updateF : updateFiles)
{
- file.incNegCount(jf);
+ if (addFile != updateF)
+ {
+
+ // cleanup dependency between updateFile and deleteFile
+
+ // Say you have this scenario: (A=Add, U=Update, D=Delete)
+ // File1: A1
+ // File2: U1
+ // File3: D1
+
+ // I need to cleanup the counter between D1 and A1, and the counter between D1 and U1
+ updateF.addCleanupInfo(id, deleteFile);
+ }
+
+ deleteFile.incNegCount(updateF);
}
}
}
+
}
/** Class that will control buffer-reuse */
@@ -2469,6 +3155,12 @@
return numberOfElementsPerFile;
}
+ // Used after cleanup records
+ public void addSummaryOnly(final JournalFile file)
+ {
+ getCounter(file).incrementAndGet();
+ }
+
public void addPositive(final JournalFile file, final long id)
{
getCounter(file).incrementAndGet();
@@ -2526,7 +3218,7 @@
if (posFiles != null)
{
- posFiles.addDelete(n.a);
+ posFiles.addDelete(n.b, n.a);
}
}
}
@@ -2608,8 +3300,7 @@
}
}
-
-
+
private class ByteArrayEncoding implements EncodingSupport
{
@@ -2654,5 +3345,95 @@
}
+ /**
+ *
+ *
+ * Abstraction used to read journal files.
+ *
+ *
+ */
+ private static interface JournalReader
+ {
+ void addRecord(int recordPos, RecordInfo info) throws Exception;
+ /**
+ * @param recordPos
+ * @param transactionID
+ * @param recordInfo
+ */
+ void cleanedUpdateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordInfo
+ */
+ void cleanedUpdateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordPos
+ * @param transactionID
+ * @param recordInfo
+ */
+ void cleanedAddRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordPos
+ * @param recordInfo
+ */
+ void cleanedAddRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecord(int recordPos, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordID
+ */
+ void deleteRecord(int recordPos, long recordID) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void addRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ */
+ void deleteRecordTX(int recordPos, long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param extraData
+ * @param summaryData
+ */
+ void prepareRecord(int recordPos, long transactionID, byte[] extraData, byte[] summaryData) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param summaryData
+ */
+ void commitRecord(int recordPos, long transactionID, byte[] summaryData) throws Exception;
+
+ /**
+ * @param transactionID
+ */
+ void rollbackRecord(int recordPos, long transactionID) throws Exception;
+
+ /**
+ *
+ */
+ void markAsDataFile();
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -44,6 +44,7 @@
* <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming">http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming</a></p>
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class Reclaimer
@@ -57,14 +58,19 @@
log.trace(message);
}
- public void scan(final JournalFile[] files)
+ /** Returns the number of files holding second criterion (linked-list effect) */
+ public int scan(final JournalFile[] files)
{
+ int secondCriterionCount = 0;
+
for (int i = 0; i < files.length; i++)
{
- // First we evaluate criterion 1)
+ // First we evaluate criterion 1) (Which is simple reference counting)
JournalFile currentFile = files[i];
+ currentFile.setLinkedDependency(false);
+
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -88,10 +94,14 @@
}
currentFile.setCanReclaim(true);
+
+ // This attribute would be helpful on calculating % usage of a file..
+ // And it is also useful for debugging
+ currentFile.setTotalNegCount(totNeg);
if (posCount <= totNeg)
{
- // Now we evaluate criterion 2)
+ // Now we evaluate criterion 2) (This file shouldn't have delete records on non reclaimable files)
for (int j = 0; j <= i; j++)
{
@@ -114,6 +124,11 @@
currentFile.setCanReclaim(false);
+ // This file is holding currentFile from being reclaimed, hence we set it as a linked dependency
+ file.setLinkedDependency(true);
+
+ secondCriterionCount++;
+
break;
}
}
@@ -124,5 +139,7 @@
currentFile.setCanReclaim(false);
}
}
+
+ return secondCriterionCount;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -1127,7 +1127,8 @@
return;
}
- if (pagingStore == null)
+ // PagingManager would be null only on testcases
+ if (pagingStore == null && pagingManager != null)
{
// TODO: It would be better if we could initialize the pagingStore during the construction
try
@@ -1137,7 +1138,6 @@
catch (Exception e)
{
// This shouldn't happen, and if it happens, this shouldn't abort the route
- log.warn("Error getting the page-store Destination", e);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -705,12 +705,6 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
- // if (realFiles)
- // {
- // consumer.setLargeMessagesAsFiles(true);
- // consumer.setLargeMessagesDir(new File(clientLargeMessagesDir));
- // }
-
session.start();
for (int i = 0; i < 100; i++)
Added: trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/JournalCleanupIntegrationTest.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.journal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.xa.BasicXaTest;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A JournalCleanupIntegrationTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Feb 13, 2009 6:55:48 PM
+ *
+ *
+ */
+public class JournalCleanupIntegrationTest extends ServiceTestBase
+{
+ private static Logger log = Logger.getLogger(BasicXaTest.class);
+
+ private final Map<String, AddressSettings> addressSettings = new HashMap<String, AddressSettings>();
+
+ private MessagingService messagingService;
+
+ private ClientSessionFactory sessionFactory;
+
+ private Configuration configuration;
+
+ private final SimpleString a1 = new SimpleString("a1");
+
+ private final SimpleString a2 = new SimpleString("a2");
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ clearData();
+ addressSettings.clear();
+ configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(getPageDir());
+
+ messagingService = createService(true, configuration, addressSettings);
+
+ // start the server
+ messagingService.start();
+
+ sessionFactory = createInVMFactory();
+ ClientSession clientSession;
+ clientSession = sessionFactory.createSession(true, false, false);
+
+ clientSession.createQueue(a1, a1, null, true, true);
+ clientSession.createQueue(a2, a2, null, true, true);
+ clientSession.close();
+ }
+
+ protected void tearDown() throws Exception
+ {
+
+ if (messagingService != null && messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+
+ super.tearDown();
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAutoCleanup() throws Exception
+ {
+ ClientSession clientSession = sessionFactory.createSession(false, true, true);
+
+ ClientSession sessionConsumer = sessionFactory.createSession(null, null, false, true, true, false, 0);
+
+ int NUMBER_OF_MESSAGES = 1000;
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_OF_MESSAGES);
+
+ try
+ {
+ ClientProducer prod = clientSession.createProducer(a1);
+ prod.send(createTextMessage(clientSession, "hello"));
+ prod.close();
+
+ ClientConsumer cons = sessionConsumer.createConsumer(a2);
+
+ sessionConsumer.start();
+
+ LocalHandler handler = new LocalHandler(latch);
+
+ cons.setMessageHandler(handler);
+
+ prod = clientSession.createProducer(a2);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ prod.send(createBytesMessage(clientSession, new byte[1024], true));
+ }
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ if (handler.e != null)
+ {
+ throw handler.e;
+ }
+
+ JournalStorageManager storage = (JournalStorageManager)messagingService.getServer().getStorageManager();
+ JournalImpl journal = (JournalImpl)storage.getMessageJournal();
+
+ // The cleanup is asynchronous, we keep trying the condition until a timeout of 15 seconds
+ for (long timeout = System.currentTimeMillis() + 15000; timeout > System.currentTimeMillis();)
+ {
+ // Wait the current task to finish before we test the condition again
+ journal.debugWait();
+
+ if (journal.getDataFilesCount() <= 3)
+ {
+ break;
+ }
+ }
+
+ assertTrue(journal.getDataFilesCount() <= 3);
+
+
+ cons.close();
+ cons = sessionConsumer.createConsumer(a1);
+
+ ClientMessage mess = cons.receive(1000);
+
+ assertNotNull(mess);
+ mess.acknowledge();
+ sessionConsumer.commit();
+
+ journal.forceMoveNextFile();
+ journal.checkAndReclaimFiles();
+
+ assertEquals(0, journal.getDataFilesCount());
+ }
+ finally
+ {
+ clientSession.close();
+ sessionConsumer.close();
+ }
+
+ }
+
+ class LocalHandler implements MessageHandler
+ {
+ Exception e;
+
+ CountDownLatch latch;
+
+ LocalHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ latch.countDown();
+ }
+ catch (MessagingException e)
+ {
+ this.e = e;
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/FakeJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/FakeJournalImplTest.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/FakeJournalImplTest.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -30,6 +30,7 @@
* A FakeJournalImplTest
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class FakeJournalImplTest extends JournalImplTestUnit
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -168,9 +168,14 @@
journal.stop();
}
-
+
protected void loadAndCheck() throws Exception
{
+ loadAndCheck(false);
+ }
+
+ protected void loadAndCheck(boolean printDebugJournal) throws Exception
+ {
List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
@@ -178,6 +183,11 @@
journal.load(committedRecords, preparedTransactions);
checkRecordsEquivalent(records, committedRecords);
+
+ if (printDebugJournal)
+ {
+ printJournalLists(records, committedRecords);
+ }
// check prepared transactions
@@ -430,6 +440,11 @@
protected void checkRecordsEquivalent(final List<RecordInfo> expected, final List<RecordInfo> actual)
{
+ if (expected.size() != actual.size())
+ {
+ printJournalLists(expected, actual);
+ }
+
assertEquals("Lists not same length", expected.size(), actual.size());
Iterator<RecordInfo> iterExpected = expected.iterator();
@@ -442,6 +457,11 @@
RecordInfo ractual = iterActual.next();
+ if (rexpected.id != ractual.id || rexpected.isUpdate != ractual.isUpdate)
+ {
+ printJournalLists(expected, actual);
+ }
+
assertEquals("ids not same", rexpected.id, ractual.id);
assertEquals("type not same", rexpected.isUpdate, ractual.isUpdate);
@@ -450,6 +470,26 @@
}
}
+ /**
+ * @param expected
+ * @param actual
+ */
+ private void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
+ {
+ System.out.println("***********************************************");
+ System.out.println("Expected list:");
+ for (RecordInfo info : expected)
+ {
+ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
+ System.out.println("Actual list:");
+ for (RecordInfo info : actual)
+ {
+ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
+ System.out.println("***********************************************");
+ }
+
protected byte[] generateRecord(final int length)
{
byte[] record = new byte[length];
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -668,15 +668,9 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
+ // Make sure there is one record per file
setup(2, calculateRecordSize(8, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
- getAlignment()), true); // Make
- // sure
- // there
- // is
- // one
- // record
- // per
- // file
+ getAlignment()), true);
createJournal();
startJournal();
load();
@@ -713,15 +707,10 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
+ // Make sure there is one record per file
setup(2, calculateRecordSize(8, getAlignment()) + calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
- getAlignment()), true); // Make
- // sure
- // there
- // is
- // one
- // record
- // per
- // file
+ getAlignment()), true);
+
createJournal();
startJournal();
load();
@@ -2172,6 +2161,24 @@
loadAndCheck();
}
+ public void testSimpleAddUpdateDeleteTransactional() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 1);
+ commit(1);
+ updateTx(2, 1);
+ commit(2);
+ deleteTx(3, 1);
+ commit(3);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
public void testMultipleAddUpdateDelete() throws Exception
{
setup(10, 10 * 1024, true);
@@ -2937,7 +2944,7 @@
public void testReclaimAfterUpdate() throws Exception
{
- setup(2, 40 * 1024, true);
+ setup(2, 60 * 1024, true);
createJournal();
startJournal();
@@ -2945,17 +2952,30 @@
for (int i = 0; i < 100; i++)
{
- if (i % 10 == 0 && i>0)
+ add(i);
+ if (i % 10 == 0 && i > 0)
{
System.out.println("new file at " + i);
journal.forceMoveNextFile();
}
+ update(i);
- add(i);
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+
+ addTx(i, i + 100);
+ updateTx(i + 100);
+ if (i % 10 == 0 && i > 0)
+ {
+ System.out.println("new file at " + i);
+ journal.forceMoveNextFile();
+ }
+ commit(i);
update(i);
-
}
-
+
System.out.println("Before stop ****************************");
System.out.println(journal.debug());
System.out.println("*****************************************");
@@ -2971,7 +2991,7 @@
journal.forceMoveNextFile();
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < 200; i++)
{
delete(i);
}
@@ -2982,23 +3002,568 @@
System.out.println(journal.debug());
System.out.println("*****************************************");
-
journal.checkAndReclaimFiles();
System.out.println("After reclaim ****************************");
System.out.println(journal.debug());
System.out.println("*****************************************");
+ assertEquals(0, journal.getDataFilesCount());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(0, journal.getDataFilesCount());
+ }
+
+ public void testCleanupNonTransactional() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ add(1, 2, 3, 4, 5, 6, 7, 8, 9);
+ add(50);
+ delete(50);
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ journal.forceMoveNextFile();
+
+ update(1, 2, 3, 4, 5, 6, 7, 8);
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ delete(1, 2, 3, 4, 5, 6, 7, 8);
+
+ add(10, 11, 12, 13, 14, 15);
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Data = " + journal.getDataFilesCount());
+ delete(10, 11, 12, 13, 14, 15);
+
+ System.out.println("Data = " + journal.getDataFilesCount());
+ journal.checkAndReclaimFiles();
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ System.out.println("Before ********************************************");
+ System.out.println(journal.debug());
+
+ journal.forceMoveNextFile();
+
+ System.out.println("After ********************************************");
+
+ System.out.println("Journal: " + journal.debug());
+
+ journal.checkAndReclaimFiles();
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ log.debug("Debug on Journal before stopJournal - \n" + debugJournal());
+
+ journal.cleanup(1);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ add(99);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ delete(99);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ }
+
+ public void testCleanupTransactional() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ // File 1
+ {
+ addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ // 9 positives
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 2
+ {
+ addTx(1, 10, 11, 12, 13, 14, 15);
+ commit(1);
+ add(16);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 2: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 3
+ {
+ delete(16);
+ update(1, 2, 3, 4, 5, 6, 7, 8);
+ delete(10, 11, 12, 13, 14, 15);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 3: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // file 4
+ {
+ delete(1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 4: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.cleanup(1);
+
+ System.out.println("After Cleanup 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.cleanup(2);
+
+ journal.checkAndReclaimFiles();
+ System.out.println("After Reclaim on Cleanup 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
stopJournal();
createJournal();
startJournal();
loadAndCheck();
+ System.out.println("After reload ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // assertEquals(2, journal.getDataFilesCount());
+
+ delete(9);
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
+ System.out.println("journal = " + journal.getDataFilesCount());
+
+ add(100);
+ update(100);
+ delete(100);
+
+ journal.forceMoveNextFile();
+
+ System.out.println("After add-update-delete on same file ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.checkAndReclaimFiles();
+
+ System.out.println("Final ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ }
+
+ public void testCleanupWithDeleteUpdatesDifferentFiles() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ // File 1
+ {
+ addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ commit(1);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 2
+ {
+ updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ commit(2);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 2: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 3
+ {
+ delete(1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("Holding for secondLevel = " + journal.checkAndReclaimFiles());
+
+ assertTrue(journal.getJournalFile(1).isLinkedDependency());
+ assertFalse(journal.getJournalFile(2).isLinkedDependency());
+ assertFalse(journal.getJournalFile(3).isLinkedDependency());
+
+ System.out.println("File 3: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.cleanup(1);
+
+ System.out.println("After Cleanup 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ journal.checkAndReclaimFiles();
+
+ delete(9);
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
+ System.out.println("Final ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCleanupWithoutDeleteUpdatesDifferentFiles() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ // File 1
+ {
+ addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ commit(1);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 2
+ {
+ updateTx(2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ commit(2);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 2: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 3
+ {
+ delete(1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 3: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.cleanup(1);
+
+ System.out.println("After Cleanup 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ journal.cleanup(2);
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
+ System.out.println("Final ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(1, journal.getDataFilesCount());
+
+ delete(9);
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
assertEquals(0, journal.getDataFilesCount());
+
+ System.out.println("data:" + journal.getDataFilesCount());
+
+ System.out.println("After Delete ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
}
-
+ public void testCleanupWithUpdatesSameFiles() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ // File 1
+ {
+ addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ commit(1);
+ update(1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ // File 2
+ {
+ delete(1, 2, 3, 4, 5, 6, 7, 8);
+ }
+
+ journal.forceMoveNextFile();
+
+ System.out.println("File 2: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.cleanup(1);
+
+ System.out.println("After Cleanup 1: ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ journal.forceMoveNextFile();
+ add(20);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ System.out.println("Final ********************************************");
+ System.out.println(journal.debug());
+ System.out.println("***************************************************");
+
+ }
+
+ public void testCleanupWholeFile() throws Exception
+ {
+ setup(2, 20 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ add(50);
+ delete(50);
+
+ addTx(1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ journal.forceMoveNextFile();
+
+ addTx(1, 10, 11, 12, 13, 14, 15);
+
+ commit(1);
+
+ journal.forceMoveNextFile();
+
+ delete(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
+ assertEquals(1, journal.getDataFilesCount());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ delete(10, 11, 12, 13, 14, 15);
+ journal.checkAndReclaimFiles();
+ assertEquals(0, journal.getDataFilesCount());
+
+ System.out.println("Second one **********************************");
+ System.out.println(journal.debug());
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ }
+
+ public void testAutomaticCleanup() throws Exception
+ {
+ setup(2, 3 * 1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ add(50);
+ delete(50);
+
+ addTx(1, 1, 2);
+ journal.forceMoveNextFile();
+ updateTx(1, 1, 2);
+ journal.forceMoveNextFile();
+ commit(1);
+ System.out.println("Data = " + journal.getDataFilesCount());
+
+ journal.forceMoveNextFile();
+
+ delete(1);
+
+ for (int i = 10; i < 20; i++)
+ {
+ add(i);
+ journal.forceMoveNextFile();
+ update(i);
+ journal.forceMoveNextFile();
+ delete(i);
+ }
+
+ journal.forceMoveNextFile();
+
+ add(100);
+ update(100);
+
+ for (int i = 101; i < 120; i++)
+ {
+ add(i);
+ journal.forceMoveNextFile();
+ update(i);
+ journal.forceMoveNextFile();
+ delete(i);
+ }
+
+ journal.checkAndReclaimFiles();
+
+ journal.setAutoReclaim(true);
+ journal.forceMoveNextFile();
+
+
+ // The cleanup is asynchronous, we keep trying the condition until a timeout of 5 seconds
+ for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis();)
+ {
+ // Wait the current task to finish before we test the condition again
+ journal.debugWait();
+
+ if (journal.getDataFilesCount() == 4)
+ {
+ break;
+ }
+ }
+
+ assertEquals(4, journal.getDataFilesCount());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ delete(2, 100);
+
+ // moving to the next file, so any deletes will already make way on reclaiming
+ journal.forceMoveNextFile();
+
+ journal.checkAndReclaimFiles();
+
+ assertEquals(0, journal.getDataFilesCount());
+
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(0, journal.getDataFilesCount());
+
+ }
+
protected abstract int getAlignment();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,6 +33,7 @@
import org.jboss.messaging.core.journal.impl.Reclaimer;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.Pair;
/**
*
@@ -748,6 +750,8 @@
private boolean canDelete;
+ private boolean linkedDependency;
+
public void extendOffset(final int delta)
{
}
@@ -794,6 +798,18 @@
negCounts.put(file, c);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#decNegCount(org.jboss.messaging.core.journal.impl.JournalFile)
+ */
+ public void decNegCount(JournalFile file)
+ {
+ Integer count = negCounts.get(file);
+
+ int c = count == null ? 1 : count.intValue() - 1;
+
+ negCounts.put(file, c);
+ }
+
public int getPosCount()
{
return posCount;
@@ -863,5 +879,54 @@
{
return transactionIDs;
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getTotalNegCount()
+ */
+ public int getTotalNegCount()
+ {
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#setTotalNegCount(int)
+ */
+ public void setTotalNegCount(int total)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#addCleanupInfo(long, org.jboss.messaging.core.journal.impl.JournalFile)
+ */
+ public void addCleanupInfo(long id, JournalFile deleteFile)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getCleanupInfo(long)
+ */
+ public JournalFile getCleanupInfo(long id)
+ {
+ return null;
+ }
+
+ /**
+ * @return the linkedDependency
+ */
+ public boolean isLinkedDependency()
+ {
+ return linkedDependency;
+ }
+
+ /**
+ * @param linkedDependency the linkedDependency to set
+ */
+ public void setLinkedDependency(boolean linkedDependency)
+ {
+ this.linkedDependency = linkedDependency;
+ }
+
+
+
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-02-14 00:11:52 UTC (rev 5863)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-02-14 01:49:45 UTC (rev 5864)
@@ -93,14 +93,15 @@
System.out.println("Total of " + records.size() + " committed records");
-// remove this comment this if you need to verify the content
-// for (RecordInfo record: records)
-// {
-// System.out.println("user record: " + record.userRecordType + " id: " + record.id + " isUpdated: " + record.isUpdate);
-// }
+ for (RecordInfo record: records)
+ {
+ System.out.println("user record: " + record);
+ }
journal.checkAndReclaimFiles();
+ System.out.println("Data = " + journal.debug());
+
journal.stop();
More information about the jboss-cvs-commits
mailing list