[jboss-cvs] JBoss Messaging SVN: r7504 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 30 18:30:19 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-30 18:30:18 -0400 (Tue, 30 Jun 2009)
New Revision: 7504
Added:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Checking for control file on journal startup
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -42,6 +42,8 @@
void open() throws Exception;
boolean isOpen();
+
+ boolean exists();
/**
* For certain operations (like loading) we don't need open the file with full maxIO
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -116,6 +116,11 @@
return aioFile.getBlockSize();
}
+
+ public boolean exists()
+ {
+ return file.exists();
+ }
public int calculateBlockStart(final int position) throws Exception
{
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.ConcurrentHashSet;
import org.jboss.messaging.utils.DataConstants;
@@ -51,6 +52,8 @@
public class JournalCompactor implements JournalReaderCallback
{
+ private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
private static final Logger log = Logger.getLogger(JournalCompactor.class);
private final JournalImpl journal;
@@ -63,7 +66,7 @@
private int fileID;
- private ChannelBuffer channelWrapper;
+ private ChannelBuffer writingChannel;
private int nextOrderingID;
@@ -83,6 +86,135 @@
* we cache those updates during compacting. As soon as we are done we take the right account. */
private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
+ /**
+ * @param tmpRenameFile
+ * @param files
+ * @param newFiles
+ */
+ public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
+ final List<JournalFile> files,
+ final List<JournalFile> newFiles) throws Exception
+ {
+
+ System.out.println("****************************** controlFiles (deleted)");
+
+ for (JournalFile file : files)
+ {
+ System.out.println("To be deleted:" + file.getFile().getFileName());
+ }
+
+ System.out.println("****************************** controlFiles (renamed)");
+
+ for (JournalFile file : newFiles)
+ {
+ System.out.println("To be renamed:" + file.getFile().getFileName());
+ }
+
+ SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+ try
+ {
+ controlFile.open(1);
+
+ ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
+
+ renameBuffer.writeInt(-1);
+ renameBuffer.writeInt(-1);
+
+ MessagingBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
+
+ // DataFiles first
+
+ filesToRename.writeInt(files.size());
+
+ for (JournalFile file : files)
+ {
+ filesToRename.writeUTF(file.getFile().getFileName());
+ }
+
+ filesToRename.writeInt(newFiles.size());
+
+ for (JournalFile file : newFiles)
+ {
+ filesToRename.writeUTF(file.getFile().getFileName());
+ }
+
+ JournalImpl.writeAddRecord(-1,
+ 1,
+ (byte)0,
+ new JournalImpl.ByteArrayEncoding(filesToRename.array()),
+ JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
+ renameBuffer);
+
+ ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+
+ writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
+
+ writeBuffer.rewind();
+
+ controlFile.write(writeBuffer, true);
+
+ return controlFile;
+ }
+ finally
+ {
+ controlFile.close();
+ }
+ }
+
+ public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
+ final List<String> dataFiles,
+ final List<String> newFiles) throws Exception
+ {
+ SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+ if (controlFile.exists())
+ {
+ JournalFile file = new JournalFileImpl(controlFile, -1, -1);
+
+ final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallbackAbstract()
+ {
+ @Override
+ public void onReadAddRecord(final RecordInfo info) throws Exception
+ {
+ records.add(info);
+ }
+ });
+
+ if (records.size() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ ChannelBuffer input = ChannelBuffers.wrappedBuffer(records.get(0).data);
+
+ int numberDataFiles = input.readInt();
+
+ for (int i = 0; i < numberDataFiles; i++)
+ {
+ dataFiles.add(input.readUTF());
+ }
+
+ int numberNewFiles = input.readInt();
+
+ for (int i = 0; i < numberNewFiles; i++)
+ {
+ newFiles.add(input.readUTF());
+ }
+
+ }
+
+ return controlFile;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
public List<JournalFile> getNewDataFiles()
{
return newDataFiles;
@@ -136,14 +268,20 @@
/** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes
* instead of throwing exceptions about non existent records */
- for (long id : ids)
+ if (ids != null)
{
- recordsSnapshot.add(id);
+ for (long id : ids)
+ {
+ recordsSnapshot.add(id);
+ }
}
- for (long id : ids2)
+ if (ids2 != null)
{
- recordsSnapshot.add(id);
+ for (long id : ids2)
+ {
+ recordsSnapshot.add(id);
+ }
}
}
@@ -180,13 +318,13 @@
private void checkSize(final int size) throws Exception
{
- if (channelWrapper == null)
+ if (writingChannel == null)
{
openFile();
}
else
{
- if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
+ if (writingChannel.writerIndex() + size > writingChannel.capacity())
{
openFile();
}
@@ -196,15 +334,15 @@
/** Write pending output into file */
public void flush() throws Exception
{
- if (channelWrapper != null)
+ if (writingChannel != null)
{
sequentialFile.position(0);
- sequentialFile.write(channelWrapper.toByteBuffer(), true);
+ sequentialFile.write(writingChannel.toByteBuffer(), true);
sequentialFile.close();
newDataFiles.add(currentFile);
}
- channelWrapper = null;
+ writingChannel = null;
}
/**
@@ -238,12 +376,12 @@
checkSize(size);
- journal.writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- channelWrapper);
+ JournalImpl.writeAddRecord(fileID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ writingChannel);
newRecords.put(info.id, new JournalRecord(currentFile));
}
@@ -261,13 +399,13 @@
newTransaction.addPositive(currentFile, info.id);
- journal.writeAddRecordTX(fileID,
- transactionID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- channelWrapper);
+ JournalImpl.writeAddRecordTX(fileID,
+ transactionID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ writingChannel);
}
else
{
@@ -306,12 +444,12 @@
checkSize(size);
- journal.writeDeleteRecordTransactional(fileID,
- transactionID,
- info.id,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- channelWrapper);
+ JournalImpl.writeDeleteRecordTransactional(fileID,
+ transactionID,
+ info.id,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ writingChannel);
newTransaction.addNegative(currentFile, info.id);
}
@@ -334,14 +472,14 @@
checkSize(size);
- journal.writeTransaction(fileID,
- JournalImpl.PREPARE_RECORD,
- transactionID,
- newTransaction,
- new JournalImpl.ByteArrayEncoding(extraData),
- size,
- newTransaction.getCounter(currentFile),
- channelWrapper);
+ JournalImpl.writeTransaction(fileID,
+ JournalImpl.PREPARE_RECORD,
+ transactionID,
+ newTransaction,
+ new JournalImpl.ByteArrayEncoding(extraData),
+ size,
+ newTransaction.getCounter(currentFile),
+ writingChannel);
newTransaction.prepare(currentFile);
@@ -372,16 +510,18 @@
{
log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
}
+ else
+ {
+ newRecord.addUpdateFile(currentFile);
+ }
- journal.writeUpdateRecord(fileID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- channelWrapper);
+ JournalImpl.writeUpdateRecord(fileID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ writingChannel);
- newRecord.addUpdateFile(currentFile);
-
}
}
@@ -395,13 +535,13 @@
checkSize(size);
- journal.writeUpdateRecordTX(fileID,
- transactionID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- channelWrapper);
+ JournalImpl.writeUpdateRecordTX(fileID,
+ transactionID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ writingChannel);
newTransaction.addPositive(currentFile, info.id);
}
@@ -434,7 +574,7 @@
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
- channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+ writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
currentFile = journal.getFile(false, false, false);
sequentialFile = currentFile.getFile();
@@ -443,8 +583,8 @@
fileID = nextOrderingID++;
currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
- channelWrapper.writeInt(fileID);
- channelWrapper.writeInt(fileID);
+ writingChannel.writeInt(fileID);
+ writingChannel.writeInt(fileID);
}
private static abstract class CompactCommand
@@ -473,7 +613,7 @@
}
}
- private class PendingTransaction
+ private static class PendingTransaction
{
long pendingIDs[];
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -107,8 +107,6 @@
// log.trace(message);
}
- private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
-
// The sizes of primitive types
public static final int MIN_FILE_SIZE = 1024;
@@ -117,25 +115,37 @@
public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
- public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
+ public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT /* + record.length */;
// Record markers - they must be all unique
public static final byte ADD_RECORD = 11;
- public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
+ public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_INT /* + record.length */;
public static final byte UPDATE_RECORD = 12;
- public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+ public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT /* + record.length */;
public static final byte ADD_RECORD_TX = 13;
- public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+ public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BYTE +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT /* + record.length */;
public static final byte UPDATE_RECORD_TX = 14;
- public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
+ public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT /* + record.length */;
public static final byte DELETE_RECORD_TX = 15;
@@ -143,7 +153,8 @@
public static final byte DELETE_RECORD = 16;
- public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT;
public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + DataConstants.SIZE_INT;
@@ -258,14 +269,14 @@
this.maxAIO = maxAIO;
}
-
+
// Public methods (used by package members) (those are not part of the JournalImpl interface)
-
+
public Map<Long, JournalRecord> getRecords()
{
return records;
}
-
+
public JournalFile getCurrentFile()
{
return currentFile;
@@ -839,7 +850,6 @@
throw new IllegalStateException("There is pending compacting operation");
}
-
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
boolean previousReclaimValue = autoReclaim;
@@ -859,12 +869,15 @@
autoReclaim = false;
// Take the snapshots and replace the structures
-
+
dataFilesToProcess.addAll(dataFiles);
dataFiles.clear();
- this.compactor = new JournalCompactor(fileFactory, this, this.records.keySet(), dataFilesToProcess.get(0).getFileID());
+ this.compactor = new JournalCompactor(fileFactory,
+ this,
+ this.records.keySet(),
+ dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -872,7 +885,8 @@
entry.getValue().setCompacting();
}
- // We will calculate the new records during compacting, what will take the position the records will take after compacting
+ // We will calculate the new records during compacting, what will take the position the records will take
+ // after compacting
this.records.clear();
}
finally
@@ -880,7 +894,8 @@
compactingLock.writeLock().unlock();
}
- // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as well
+ // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
+ // well
JournalFile previousFile = null;
for (final JournalFile file : dataFilesToProcess)
{
@@ -895,7 +910,7 @@
previousFile = file;
log.info("Compacting file " + file.getFile().getFileName() + ", internalID = " + file.getFileID());
- readJournalFile(file, compactor);
+ readJournalFile(fileFactory, file, compactor);
}
compactor.flush();
@@ -910,7 +925,7 @@
List<JournalFile> newDatafiles = null;
JournalCompactor localCompactor = compactor;
-
+
compactingLock.writeLock().lock();
try
{
@@ -935,31 +950,31 @@
}
dataFiles.addFirst(fileToAdd);
}
-
// Replay pending commands (including updates, deletes and commits)
-
+
localCompactor.replayPendingCommands();
// Merge transactions back after compacting
-
+
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
{
- if (trace)
+ if (trace)
{
- trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
+ trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
}
JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
if (liveTransaction == null)
{
- log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() + " back into JournalTransactions");
+ log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() +
+ " back into JournalTransactions");
}
else
{
liveTransaction.merge(newTransaction);
}
}
-
+
for (Map.Entry<Long, JournalRecord> record : records.entrySet())
{
trace("We have " + record.getKey() + " on the list now");
@@ -971,6 +986,7 @@
compactingLock.writeLock().unlock();
}
+ // At this point the journal is unlocked. We keep renaming files while the journal is already operational
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
@@ -1032,44 +1048,10 @@
*/
protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
{
- SequentialFile tmpRenameFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
-
- tmpRenameFile.open();
-
- ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
-
- renameBuffer.writeInt(-1);
- renameBuffer.writeInt(-1);
-
- MessagingBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
-
- // DataFiles first
-
- filesToRename.writeInt(files.size());
-
- for (JournalFile file : files)
- {
- filesToRename.writeUTF(file.getFile().getFileName());
- filesToRename.writeInt(file.getFileID()); // The File ID that needs to be renamed
- }
-
- filesToRename.writeInt(newFiles.size());
-
- for (JournalFile file : newFiles)
- {
- filesToRename.writeUTF(file.getFile().getFileName());
- }
-
- writeAddRecord(-1, 1, (byte)0, new ByteArrayEncoding(filesToRename.array()), SIZE_ADD_RECORD, renameBuffer);
-
- tmpRenameFile.write(renameBuffer, true);
-
- tmpRenameFile.close();
-
- return tmpRenameFile;
+ return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
}
- private boolean isInvalidSize(int bufferPos, int size)
+ private static boolean isInvalidSize(int fileSize, int bufferPos, int size)
{
if (size < 0)
{
@@ -1125,6 +1107,8 @@
throw new IllegalStateException("Journal must be in started state");
}
+ checkControlFile();
+
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
@@ -1139,7 +1123,7 @@
final AtomicBoolean hasData = new AtomicBoolean(false);
- int resultLastPost = readJournalFile(file, new JournalReaderCallback()
+ int resultLastPost = readJournalFile(fileFactory, file, new JournalReaderCallback()
{
public void onReadAddRecord(RecordInfo info) throws Exception
@@ -1509,6 +1493,62 @@
return maxID;
}
+ /**
+ * @return
+ * @throws Exception
+ */
+ private void checkControlFile() throws Exception
+ {
+ ArrayList<String> dataFiles = new ArrayList<String>();
+ ArrayList<String> newFiles = new ArrayList<String>();
+
+ SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+ if (controlFile != null)
+ {
+ log.info("Journal Compactor was interrupted during renaming phase, renaming files");
+
+ for (String dataFile : dataFiles)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+ log.info("Removing old compacted file" + file.getFileName());
+ if (file.exists())
+ {
+ file.delete();
+ }
+ }
+
+ for (String newFile : newFiles)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+ log.info("Renaming file " + file.getFileName() + " as an part of the data files");
+ if (file.exists())
+ {
+ final String originalName = file.getFileName();
+ final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
+ file.renameTo(newName);
+ }
+ }
+
+ controlFile.delete();
+ }
+
+
+ List<String> leftFiles = fileFactory.listFiles(this.getFileExtension() + ".cmp");
+
+ if (leftFiles.size() > 0)
+ {
+ log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
+
+ for (String fileToDelete : leftFiles)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+ file.delete();
+ }
+ }
+
+ return;
+ }
+
public int getAlignment() throws Exception
{
return fileFactory.getAlignment();
@@ -1827,7 +1867,7 @@
return jf;
}
- public int readJournalFile(JournalFile file, JournalReaderCallback reader) throws Exception
+ public static int readJournalFile(SequentialFileFactory fileFactory, JournalFile file, JournalReaderCallback reader) throws Exception
{
file.getFile().open(1);
@@ -1837,9 +1877,9 @@
wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
- int bytesRead = file.getFile().read(wholeFileBuffer);
+ final int journalFileSize = file.getFile().read(wholeFileBuffer);
- if (bytesRead != file.getFile().size())
+ if (journalFileSize != file.getFile().size())
{
throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
@@ -1865,7 +1905,7 @@
continue;
}
- if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
{
reader.markAsDataFile(file);
@@ -1882,7 +1922,7 @@
if (isTransaction(recordType))
{
- if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
{
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
@@ -1897,7 +1937,7 @@
// If prepare or commit
if (!isCompleteTransaction(recordType))
{
- if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
{
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
@@ -1923,7 +1963,7 @@
if (isContainsBody(recordType))
{
- if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
{
wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
@@ -1932,7 +1972,7 @@
variableSize = wholeFileBuffer.getInt();
- if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
{
wholeFileBuffer.position(pos + 1);
continue;
@@ -1969,7 +2009,7 @@
// VI - this is completing V, We will validate the size at the end
// of the record,
// But we avoid buffer overflows by damaged data
- if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+ if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
{
// Avoid a buffer overflow caused by damaged data... continue
// scanning for more pendingTransactions...
@@ -1995,7 +2035,10 @@
int oldPos = wholeFileBuffer.position();
- wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
+ wholeFileBuffer.position(pos + variableSize +
+ recordSize +
+ preparedTransactionExtraDataSize -
+ DataConstants.SIZE_INT);
int checkSize = wholeFileBuffer.getInt();
@@ -2182,14 +2225,14 @@
* @return
* @throws Exception
*/
- void writeTransaction(final int fileID,
- final byte recordType,
- final long txID,
- final JournalTransaction tx,
- final EncodingSupport transactionData,
- final int size,
- final int numberOfRecords,
- final ChannelBuffer bb) throws Exception
+ public static void writeTransaction(final int fileID,
+ final byte recordType,
+ final long txID,
+ final JournalTransaction tx,
+ final EncodingSupport transactionData,
+ final int size,
+ final int numberOfRecords,
+ final ChannelBuffer bb) throws Exception
{
bb.writeByte(recordType);
bb.writeInt(fileID); // skip ID part
@@ -2217,13 +2260,13 @@
* @param size
* @param bb
*/
- void writeUpdateRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
+ public static void writeUpdateRecordTX(final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
{
bb.writeByte(UPDATE_RECORD_TX);
bb.writeInt(fileID);
@@ -2242,12 +2285,12 @@
* @param size
* @param bb
*/
- void writeUpdateRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
+ public static void writeUpdateRecord(final int fileId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
{
bb.writeByte(UPDATE_RECORD);
bb.writeInt(fileId);
@@ -2265,12 +2308,12 @@
* @param size
* @param bb
*/
- void writeAddRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
+ public static void writeAddRecord(final int fileId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
{
bb.writeByte(ADD_RECORD);
bb.writeInt(fileId);
@@ -2288,12 +2331,12 @@
* @param size
* @param bb
*/
- void writeDeleteRecordTransactional(final int fileID,
- final long txID,
- final long id,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
+ public static void writeDeleteRecordTransactional(final int fileID,
+ final long txID,
+ final long id,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
{
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(fileID);
@@ -2316,13 +2359,13 @@
* @param size
* @param bb
*/
- void writeAddRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
+ public static void writeAddRecordTX(final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
{
bb.writeByte(ADD_RECORD_TX);
bb.writeInt(fileID);
@@ -2334,24 +2377,24 @@
bb.writeInt(size);
}
- private boolean isTransaction(final byte recordType)
+ private static boolean isTransaction(final byte recordType)
{
return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
recordType == DELETE_RECORD_TX ||
isCompleteTransaction(recordType);
}
- private boolean isCompleteTransaction(final byte recordType)
+ private static boolean isCompleteTransaction(final byte recordType)
{
return recordType == COMMIT_RECORD || recordType == PREPARE_RECORD || recordType == ROLLBACK_RECORD;
}
- private boolean isContainsBody(final byte recordType)
+ private static boolean isContainsBody(final byte recordType)
{
return recordType >= ADD_RECORD && recordType <= DELETE_RECORD_TX;
}
- private int getRecordSize(final byte recordType)
+ private static int getRecordSize(final byte recordType)
{
// The record size (without the variable portion)
int recordSize = 0;
@@ -2572,14 +2615,14 @@
if (fill)
{
sequentialFile.fill(0, fileSize, FILL_CHARACTER);
-
+
ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
-
+
bb.putInt(fileID);
bb.putInt(fileID);
-
+
bb.rewind();
-
+
sequentialFile.write(bb, true);
}
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallbackAbstract.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ * A JournalReaderCallbackAbstract
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalReaderCallbackAbstract implements JournalReaderCallback
+{
+
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+
+ public void onReadAddRecord(RecordInfo info) throws Exception
+ {
+ }
+
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ }
+
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ }
+
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ }
+
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ }
+
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ }
+
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ }
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ }
+
+}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -46,7 +46,7 @@
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
private File file;
-
+
private long fileSize = 0;
private final String directory;
@@ -54,7 +54,7 @@
private FileChannel channel;
private RandomAccessFile rfile;
-
+
private final AtomicLong position = new AtomicLong(0);
public NIOSequentialFile(final String directory, final String fileName)
@@ -63,11 +63,16 @@
file = new File(directory + "/" + fileName);
}
+ public boolean exists()
+ {
+ return file.exists();
+ }
+
public int getAlignment()
{
return 1;
}
-
+
public void flush()
{
}
@@ -76,7 +81,7 @@
{
return position;
}
-
+
public boolean fits(final int size)
{
return this.position.get() + size <= fileSize;
@@ -97,7 +102,7 @@
rfile = new RandomAccessFile(file, "rw");
channel = rfile.getChannel();
-
+
fileSize = channel.size();
}
@@ -124,10 +129,10 @@
channel.force(false);
channel.position(0);
-
+
fileSize = channel.size();
}
-
+
public synchronized void waitForClose() throws Exception
{
while (isOpen())
@@ -151,7 +156,7 @@
channel = null;
rfile = null;
-
+
notifyAll();
}
@@ -194,7 +199,6 @@
}
-
public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
{
write(ByteBuffer.wrap(bytes.array()), sync);
@@ -212,7 +216,7 @@
channel.write(bytes);
if (sync)
- {
+ {
sync();
}
}
@@ -222,9 +226,9 @@
try
{
position.addAndGet(bytes.limit());
-
+
channel.write(bytes);
-
+
if (sync)
{
sync();
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -24,9 +24,15 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalCompactor;
+import org.jboss.messaging.core.journal.impl.JournalFile;
+import org.jboss.messaging.core.journal.impl.JournalFileImpl;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -55,10 +61,55 @@
// General tests
// =============
+ public void testControlFile() throws Exception
+ {
+ ArrayList<JournalFile> dataFiles = new ArrayList<JournalFile>();
+
+ for (int i = 0; i < 5; i++)
+ {
+ SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst", 1);
+ dataFiles.add(new JournalFileImpl(file, 0, 0));
+ }
+
+ ArrayList<JournalFile> newFiles = new ArrayList<JournalFile>();
+
+ for (int i = 0; i < 3; i++)
+ {
+ SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
+ newFiles.add(new JournalFileImpl(file, 0, 0));
+ }
+
+ JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles);
+
+ ArrayList<String> strDataFiles = new ArrayList<String>();
+
+ ArrayList<String> strNewFiles = new ArrayList<String>();
+
+ assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles));
+
+ assertEquals(dataFiles.size(), strDataFiles.size());
+ assertEquals(newFiles.size(), strNewFiles.size());
+
+ Iterator<String> iterDataFiles = strDataFiles.iterator();
+ for (JournalFile file : dataFiles)
+ {
+ assertEquals(file.getFile().getFileName(), iterDataFiles.next());
+ }
+
+ assertFalse(iterDataFiles.hasNext());
+
+ Iterator<String> iterNewFiles = strNewFiles.iterator();
+ for (JournalFile file : newFiles)
+ {
+ assertEquals(file.getFile().getFileName(), iterNewFiles.next());
+ }
+ }
+
public void testCrashRenamingFiles() throws Exception
{
+ internalCompactTest(true, false, false, false, false, false, false, false, true, false, false);
}
-
+
public void testCrashDuringCompacting() throws Exception
{
}
@@ -73,43 +124,43 @@
public void testCompactwithPendingCommit() throws Exception
{
- internalCompactTest(false, false, false, false, false, true, false, false);
+ internalCompactTest(false, false, false, false, false, true, false, false, true, true, true);
}
public void testCompactwithDelayedCommit() throws Exception
{
- internalCompactTest(false, false, false, false, false, true, false, true);
+ internalCompactTest(false, false, false, false, false, true, false, true, true, true, true);
}
public void testCompactwithPendingCommitFollowedByDelete() throws Exception
{
- internalCompactTest(false, false, false, false, false, true, true, false);
+ internalCompactTest(false, false, false, false, false, true, true, false, true, true, true);
}
public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
{
- internalCompactTest(true, false, true, true, false, false, false, false);
+ internalCompactTest(true, false, true, true, false, false, false, false, true, true, true);
tearDown();
setUp();
- internalCompactTest(true, false, true, false, true, false, false, false);
+ internalCompactTest(true, false, true, false, true, false, false, false, true, true, true);
}
public void testCompactwithConcurrentDeletes() throws Exception
{
- internalCompactTest(true, false, false, true, false, false, false, false);
+ internalCompactTest(true, false, false, true, false, false, false, false, true, true, true);
tearDown();
setUp();
- internalCompactTest(true, false, false, false, true, false, false, false);
+ internalCompactTest(true, false, false, false, true, false, false, false, true, true, true);
}
public void testCompactwithConcurrentUpdates() throws Exception
{
- internalCompactTest(true, false, true, false, false, false, false, false);
+ internalCompactTest(true, false, true, false, false, false, false, false, true, true, true);
}
public void testCompactWithConcurrentAppend() throws Exception
{
- internalCompactTest(true, true, false, false, false, false, false, false);
+ internalCompactTest(true, true, false, false, false, false, false, false, true, true, true);
}
private void internalCompactTest(final boolean regularAdd,
@@ -119,7 +170,10 @@
boolean performNonTransactionalDelete,
final boolean pendingTransactions,
final boolean deleteTransactRecords,
- final boolean delayCommit) throws Exception
+ final boolean delayCommit,
+ final boolean createControlFile,
+ final boolean deleteControlFile,
+ final boolean renameFilesAfterCompacting) throws Exception
{
if (performNonTransactionalDelete)
{
@@ -129,7 +183,7 @@
{
performNonTransactionalDelete = false;
}
-
+
setup(50, 60 * 1024, true);
ArrayList<Long> liveIDs = new ArrayList<Long>();
@@ -140,7 +194,39 @@
final CountDownLatch latchWait = new CountDownLatch(1);
journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO)
{
+
@Override
+ protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+ {
+ if (createControlFile)
+ {
+ return super.createControlFile(files, newFiles);
+ }
+ else
+ {
+ throw new IllegalStateException("Error creating control file");
+ }
+ }
+
+ @Override
+ protected void deleteControlFile(SequentialFile controlFile) throws Exception
+ {
+ if (deleteControlFile)
+ {
+ super.deleteControlFile(controlFile);
+ }
+ }
+
+ @Override
+ protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
+ {
+ if (renameFilesAfterCompacting)
+ {
+ super.renameFiles(oldFiles, newFiles);
+ }
+ }
+
+ @Override
public void onCompactDone()
{
latchDone.countDown();
@@ -335,7 +421,7 @@
}
/** Some independent adds and updates */
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 1000; i++)
{
long id = idGenerator.generateID();
add(id);
@@ -345,7 +431,7 @@
{
journal.forceMoveNextFile();
}
- }
+ }
journal.forceMoveNextFile();
latchWait.countDown();
@@ -374,7 +460,10 @@
add(idGenerator.generateID());
- journal.compact();
+ if (createControlFile && deleteControlFile && renameFilesAfterCompacting)
+ {
+ journal.compact();
+ }
stopJournal();
createJournal();
@@ -398,7 +487,7 @@
public void testSimpleCompacting() throws Exception
{
- setup(50, 60 * 1024, true);
+ setup(2, 60 * 1024, true);
createJournal();
startJournal();
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-30 17:42:50 UTC (rev 7503)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-30 22:30:18 UTC (rev 7504)
@@ -340,10 +340,10 @@
{
data.position(0);
}
-
+
this.notifyAll();
}
-
+
public synchronized void waitForClose() throws Exception
{
while (open)
@@ -352,8 +352,6 @@
}
}
-
-
public void delete() throws Exception
{
if (!open)
@@ -613,7 +611,7 @@
public void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception
{
write(ByteBuffer.wrap(bytes.array()), sync, callback);
-
+
}
/* (non-Javadoc)
@@ -624,6 +622,14 @@
write(ByteBuffer.wrap(bytes.array()), sync);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#exists()
+ */
+ public boolean exists()
+ {
+ return fileMap.get(fileName) != null;
+ }
+
}
/* (non-Javadoc)
More information about the jboss-cvs-commits
mailing list