[jboss-cvs] JBoss Messaging SVN: r7496 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 29 14:02:57 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-29 14:02:57 -0400 (Mon, 29 Jun 2009)
New Revision: 7496
Added:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Log:
Separating Compactor and JournalTransaction as a separate class
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,366 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.buffers.ChannelBuffer;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+
+/**
+ * A JournalCompactor
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompactor implements JournalReaderCallback
+{
+
+ private static final Logger log = Logger.getLogger(JournalCompactor.class);
+
+ final JournalImpl journal;
+
+ final SequentialFileFactory fileFactory;
+
+ JournalFile currentFile;
+
+ SequentialFile sequentialFile;
+
+ int fileID;
+
+ ChannelBuffer channelWrapper;
+
+ int nextOrderingID;
+
+ final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+
+ final Map<Long, JournalRecord> recordsSnapshot;
+
+ final Map<Long, JournalTransaction> pendingTransactions;
+
+ final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+
+ final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+
+ final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+
+ final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
+
+ public JournalCompactor(final SequentialFileFactory fileFactory,
+ final JournalImpl journal,
+ Map<Long, JournalRecord> recordsSnapshot,
+ Map<Long, JournalTransaction> pendingTransactions,
+ int firstFileID)
+ {
+ this.fileFactory = fileFactory;
+ this.journal = journal;
+ this.recordsSnapshot = recordsSnapshot;
+ this.nextOrderingID = firstFileID;
+ this.pendingTransactions = pendingTransactions;
+ }
+
+ /**
+ * @param id
+ * @param usedFile
+ */
+ public void addPendingDelete(long id, JournalFile usedFile)
+ {
+ pendingDeletes.add(new Pair<Long, JournalFile>(id, usedFile));
+ }
+
+ /**
+ * @param id
+ * @param usedFile
+ */
+ public void addPendingUpdate(long id, JournalFile usedFile)
+ {
+ pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+ }
+
+ public boolean lookupRecord(long id)
+ {
+ return recordsSnapshot.get(id) != null;
+ }
+
+ private void checkSize(int size) throws Exception
+ {
+ if (channelWrapper == null)
+ {
+ openFile();
+ }
+ else
+ {
+ if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
+ {
+ openFile();
+ }
+ }
+ }
+
+ public void flush() throws Exception
+ {
+ if (channelWrapper != null)
+ {
+ sequentialFile.position(0);
+ sequentialFile.write(channelWrapper.toByteBuffer(), true);
+ sequentialFile.close();
+ newDataFiles.add(currentFile);
+ }
+
+ channelWrapper = null;
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void openFile() throws Exception
+ {
+ flush();
+
+ ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+ channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+ currentFile = journal.getFile(false, false);
+ sequentialFile = currentFile.getFile();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ sequentialFile.open(1);
+ fileID = nextOrderingID++;
+ currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
+
+ channelWrapper.writeInt(fileID);
+ channelWrapper.writeInt(fileID);
+ }
+
+ public void addRecord(RecordInfo info) throws Exception
+ {
+ if (recordsSnapshot.get(info.id) != null)
+ {
+ int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+
+ checkSize(size);
+
+ journal.writeAddRecord(fileID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+ newRecords.put(info.id, new JournalRecord(currentFile));
+ }
+ }
+
+ public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+ {
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = JournalImpl.SIZE_ADD_RECORD_TX + info.data.length;
+
+ checkSize(size);
+
+ newTransaction.addPositive(currentFile, info.id);
+
+ journal.writeAddRecordTX(fileID,
+ transactionID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+ }
+ else
+ {
+ // Will try it as a regular record, the method addRecord will validate if this is a live record or not
+ addRecord(info);
+ }
+ }
+
+ public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ JournalTransaction pendingTx = pendingTransactions.get(transactionID);
+
+ if (pendingTx != null)
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+ " for an already committed transaction during compacting");
+ }
+ }
+
+ public void deleteRecord(long recordID) throws Exception
+ {
+ if (newRecords.get(recordID) != null)
+ {
+ // Sanity check, it should never happen
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
+ }
+
+ }
+
+ public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+ {
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = JournalImpl.SIZE_DELETE_RECORD_TX + info.data.length;
+
+ checkSize(size);
+
+ journal.writeDeleteRecordTransactional(fileID,
+ transactionID,
+ info.id,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+ newTransaction.addNegative(currentFile, info.id);
+ }
+ }
+
+ public void markAsDataFile(JournalFile file)
+ {
+ // nothing to be done here
+ }
+
+ public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ if (pendingTransactions.get(transactionID) != null)
+ {
+
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+
+ checkSize(size);
+
+ journal.writeTransaction(fileID,
+ JournalImpl.PREPARE_RECORD,
+ transactionID,
+ newTransaction,
+ new JournalImpl.ByteArrayEncoding(extraData),
+ size,
+ newTransaction.getCounter(currentFile),
+ channelWrapper);
+
+ newTransaction.prepare(currentFile);
+
+ }
+ }
+
+ public void rollbackRecord(long transactionID) throws Exception
+ {
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+ " for an already rolled back transaction during compacting");
+ }
+ }
+
+ public void updateRecord(RecordInfo info) throws Exception
+ {
+ if (recordsSnapshot.get(info.id) != null)
+ {
+ int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
+
+ checkSize(size);
+
+ JournalRecord newRecord = newRecords.get(info.id);
+
+ if (newRecord == null)
+ {
+ log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
+ }
+
+ journal.writeUpdateRecord(fileID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+ newRecord.addUpdateFile(currentFile);
+
+ }
+ }
+
+ public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+ {
+
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = JournalImpl.SIZE_UPDATE_RECORD_TX + info.data.length;
+
+ checkSize(size);
+
+ journal.writeUpdateRecordTX(fileID,
+ transactionID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+ newTransaction.addPositive(currentFile, info.id);
+ }
+ else
+ {
+
+ updateRecord(info);
+ }
+ }
+
+ /**
+ * @param transactionID
+ * @return
+ */
+ private JournalTransaction getNewJournalTransaction(long transactionID)
+ {
+ JournalTransaction newTransaction = newTransactions.get(transactionID);
+ if (newTransaction == null)
+ {
+ newTransaction = new JournalTransaction(journal);
+ newTransactions.put(transactionID, newTransaction);
+ }
+ return newTransaction;
+ }
+
+}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-29 14:55:19 UTC (rev 7495)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-29 18:02:57 UTC (rev 7496)
@@ -26,11 +26,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -53,7 +51,6 @@
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.LoaderCallback;
@@ -64,8 +61,8 @@
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.Pair;
-import org.jboss.messaging.utils.VariableLatch;
import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
/**
@@ -115,47 +112,41 @@
// The sizes of primitive types
- private static final int SIZE_LONG = 8;
-
- private static final int SIZE_INT = 4;
-
- private static final int SIZE_BYTE = 1;
-
public static final int MIN_FILE_SIZE = 1024;
- public static final int SIZE_HEADER = SIZE_INT * 2;
+ public static final int SIZE_HEADER = DataConstants.SIZE_INT * 2;
- public static final int BASIC_SIZE = SIZE_BYTE + SIZE_INT + SIZE_INT;
+ public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_INT;
- public static final int SIZE_ADD_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT /* + record.length */;
+ public static final int SIZE_ADD_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
// Record markers - they must be all unique
public static final byte ADD_RECORD = 11;
- public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT /* + record.length */;
+ public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT /* + record.length */;
public static final byte UPDATE_RECORD = 12;
- public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT /* + record.length */;
+ public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
public static final byte ADD_RECORD_TX = 13;
- public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT /* + record.length */;
+ public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
public static final byte UPDATE_RECORD_TX = 14;
- public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_LONG + SIZE_INT /* + record.length */;
+ public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT /* + record.length */;
public static final byte DELETE_RECORD_TX = 15;
- public static final int SIZE_DELETE_RECORD = BASIC_SIZE + SIZE_LONG;
+ public static final int SIZE_DELETE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG;
public static final byte DELETE_RECORD = 16;
- public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_INT;
+ public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
- public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + SIZE_INT;
+ public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + DataConstants.SIZE_INT;
public static final byte PREPARE_RECORD = 17;
@@ -163,7 +154,7 @@
public static final byte COMMIT_RECORD = 18;
- public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG;
+ public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG;
public static final byte ROLLBACK_RECORD = 19;
@@ -200,8 +191,8 @@
// Compacting may replace this structure
private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
- // This will be filled only while the Compactor is being done
- private volatile Compactor compactor;
+ // This will be set only while the JournalCompactor is being executed
+ private volatile JournalCompactor compactor;
private ExecutorService filesExecutor = null;
@@ -268,6 +259,13 @@
this.maxAIO = maxAIO;
}
+
+ // Public methods (used by package members) (those are not part of the JournalImpl interface)
+
+ public Map<Long, JournalRecord> getRecords()
+ {
+ return records;
+ }
// Journal implementation
// ----------------------------------------------------------------
@@ -363,7 +361,8 @@
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
- // record== null here could only mean there is a compactor, and computing the delete should be done after compacting is done
+ // record== null here could only mean there is a compactor, and computing the delete should be done after
+ // compacting is done
if (posFiles == null)
{
compactor.addPendingUpdate(id, usedFile);
@@ -429,7 +428,8 @@
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
- // record== null here could only mean there is a compactor, and computing the delete should be done after compacting is done
+ // record== null here could only mean there is a compactor, and computing the delete should be done after
+ // compacting is done
if (record == null)
{
compactor.addPendingDelete(id, usedFile);
@@ -623,7 +623,7 @@
if (sync)
{
- tx.syncPreviousFiles();
+ tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
compactingLock.readLock().lock();
@@ -631,7 +631,7 @@
try
{
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + SIZE_INT;
+ int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
ChannelBuffer bb = newBuffer(size);
writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
@@ -855,7 +855,13 @@
recordsSnapshot = JournalImpl.this.records;
pendingTransactions = JournalImpl.this.pendingTransactions;
+ pendingTransactions.putAll(this.pendingTransactions);
+ for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+ {
+ System.out.println("TransactionID = " + entry.getKey());
+ }
+
JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -864,7 +870,7 @@
dataFiles.clear();
- this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
+ this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
}
finally
@@ -872,7 +878,7 @@
compactingLock.writeLock().unlock();
}
- // Read the files, and use the Compactor class to create the new outputFiles, and the new collections as well
+ // Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as well
JournalFile previousFile = null;
for (final JournalFile file : dataFilesToProcess)
{
@@ -921,8 +927,8 @@
JournalRecord updateRecord = this.records.get(pendingRecord.a);
updateRecord.addUpdateFile(pendingRecord.b);
}
-
- for (Pair<Long, JournalFile> pendingRecord: compactor.pendingDeletes)
+
+ for (Pair<Long, JournalFile> pendingRecord : compactor.pendingDeletes)
{
JournalRecord deleteRecord = this.records.remove(pendingRecord.a);
deleteRecord.delete(pendingRecord.b);
@@ -946,6 +952,30 @@
}
finally
{
+ // An Exception was probably thrown, and the compactor was not cleared
+ if (compactor != null)
+ {
+ try
+ {
+ compactor.flush();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ if (compactor.sequentialFile != null)
+ {
+ try
+ {
+ compactor.sequentialFile = null;
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ compactor = null;
+ }
autoReclaim = previousReclaimValue;
}
@@ -1025,315 +1055,6 @@
return tmpRenameFile;
}
- class Compactor implements JournalReaderCallback
- {
-
- JournalFile currentFile;
-
- SequentialFile sequentialFile;
-
- int fileID;
-
- ChannelBuffer channelWrapper;
-
- int nextOrderingID;
-
- final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
-
- final Map<Long, JournalRecord> recordsSnapshot;
-
- final Map<Long, JournalTransaction> pendingTransactions;
-
- final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
-
- final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
-
- final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
-
- final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
-
- public Compactor(Map<Long, JournalRecord> recordsSnapshot,
- Map<Long, JournalTransaction> pendingTransactions,
- int firstFileID)
- {
- this.recordsSnapshot = recordsSnapshot;
- this.nextOrderingID = firstFileID;
- this.pendingTransactions = pendingTransactions;
- }
-
- /**
- * @param id
- * @param usedFile
- */
- public void addPendingDelete(long id, JournalFile usedFile)
- {
- pendingDeletes.add(new Pair<Long,JournalFile>(id, usedFile));
- }
-
- /**
- * @param id
- * @param usedFile
- */
- public void addPendingUpdate(long id, JournalFile usedFile)
- {
- pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
- }
-
- public boolean lookupRecord(long id)
- {
- return recordsSnapshot.get(id) != null;
- }
-
- private void checkSize(int size) throws Exception
- {
- if (channelWrapper == null)
- {
- openFile();
- }
- else
- {
- if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
- {
- openFile();
- }
- }
- }
-
- public void flush() throws Exception
- {
- if (channelWrapper != null)
- {
- sequentialFile.position(0);
- sequentialFile.write(channelWrapper.toByteBuffer(), true);
- sequentialFile.close();
- newDataFiles.add(currentFile);
- }
-
- channelWrapper = null;
- }
-
- /**
- * @throws Exception
- */
- private void openFile() throws Exception
- {
- flush();
-
- ByteBuffer bufferWrite = fileFactory.newBuffer(fileSize);
- channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
-
- currentFile = getFile(false, false);
- sequentialFile = currentFile.getFile();
- sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
- sequentialFile.open(1);
- fileID = nextOrderingID++;
- currentFile = new JournalFileImpl(sequentialFile, fileID, fileID);
-
- channelWrapper.writeInt(fileID);
- channelWrapper.writeInt(fileID);
- }
-
- public void addRecord(RecordInfo info) throws Exception
- {
- if (recordsSnapshot.get(info.id) != null)
- {
- int size = SIZE_ADD_RECORD + info.data.length;
-
- checkSize(size);
-
- writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new ByteArrayEncoding(info.data),
- size,
- channelWrapper);
-
- newRecords.put(info.id, new JournalRecord(currentFile));
- }
- }
-
- public void addRecordTX(long transactionID, RecordInfo info) throws Exception
- {
- if (pendingTransactions.get(transactionID) != null)
- {
- JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
- int size = SIZE_ADD_RECORD_TX + info.data.length;
-
- checkSize(size);
-
- newTransaction.addPositive(currentFile, info.id);
-
- writeAddRecordTX(fileID,
- transactionID,
- info.id,
- info.getUserRecordType(),
- new ByteArrayEncoding(info.data),
- size,
- channelWrapper);
- }
- else
- {
- // Will try it as a regular record, the method addRecord will validate if this is a live record or not
- addRecord(info);
- }
- }
-
- public void commitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-
- if (pendingTx != null)
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
- " for an already committed transaction during compacting");
- }
- }
-
- public void deleteRecord(long recordID) throws Exception
- {
- if (records.get(recordID) != null)
- {
- // Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
- }
-
- }
-
- public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
- {
- if (pendingTransactions.get(transactionID) != null)
- {
- JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
- int size = SIZE_DELETE_RECORD_TX + info.data.length;
-
- checkSize(size);
-
- writeDeleteRecordTransactional(fileID,
- transactionID,
- info.id,
- new ByteArrayEncoding(info.data),
- size,
- channelWrapper);
-
- newTransaction.addNegative(currentFile, info.id);
- }
- }
-
- public void markAsDataFile(JournalFile file)
- {
- // nothing to be done here
- }
-
- public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- if (pendingTransactions.get(transactionID) != null)
- {
-
- JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + SIZE_INT;
-
- checkSize(size);
-
- writeTransaction(fileID,
- PREPARE_RECORD,
- transactionID,
- newTransaction,
- new ByteArrayEncoding(extraData),
- size,
- newTransaction.getCounter(currentFile),
- channelWrapper);
-
- newTransaction.prepare(currentFile);
-
- }
- }
-
- public void rollbackRecord(long transactionID) throws Exception
- {
- if (pendingTransactions.get(transactionID) != null)
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
- " for an already rolled back transaction during compacting");
- }
- }
-
- public void updateRecord(RecordInfo info) throws Exception
- {
- if (recordsSnapshot.get(info.id) != null)
- {
- int size = SIZE_UPDATE_RECORD + info.data.length;
-
- checkSize(size);
-
- JournalRecord newRecord = newRecords.get(info.id);
-
- if (newRecord == null)
- {
- log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
- }
-
- writeUpdateRecord(fileID,
- info.id,
- info.userRecordType,
- new ByteArrayEncoding(info.data),
- size,
- channelWrapper);
-
- newRecord.addUpdateFile(currentFile);
-
- }
- }
-
- public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
- {
-
- if (pendingTransactions.get(transactionID) != null)
- {
- JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
- int size = SIZE_UPDATE_RECORD_TX + info.data.length;
-
- checkSize(size);
-
- writeUpdateRecordTX(fileID,
- transactionID,
- info.id,
- info.userRecordType,
- new ByteArrayEncoding(info.data),
- size,
- channelWrapper);
-
- newTransaction.addPositive(currentFile, info.id);
- }
- else
- {
-
- updateRecord(info);
- }
- }
-
- /**
- * @param transactionID
- * @return
- */
- private JournalTransaction getNewJournalTransaction(long transactionID)
- {
- JournalTransaction newTransaction = newTransactions.get(transactionID);
- if (newTransaction == null)
- {
- newTransaction = new JournalTransaction();
- newTransactions.put(transactionID, newTransaction);
- }
- return newTransaction;
- }
-
- }
-
private boolean isInvalidSize(int bufferPos, int size)
{
if (size < 0)
@@ -1490,7 +1211,7 @@
if (tnp == null)
{
- tnp = new JournalTransaction();
+ tnp = new JournalTransaction(JournalImpl.this);
pendingTransactions.put(transactionID, tnp);
}
@@ -1522,7 +1243,7 @@
if (tnp == null)
{
- tnp = new JournalTransaction();
+ tnp = new JournalTransaction(JournalImpl.this);
pendingTransactions.put(transactionID, tnp);
}
@@ -1558,7 +1279,7 @@
if (journalTransaction == null)
{
- journalTransaction = new JournalTransaction();
+ journalTransaction = new JournalTransaction(JournalImpl.this);
pendingTransactions.put(transactionID, journalTransaction);
}
@@ -2096,299 +1817,314 @@
{
file.getFile().open(1);
-
- ByteBuffer wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
-
- int bytesRead = file.getFile().read(wholeFileBuffer);
-
- if (bytesRead != file.getFile().size())
+ ByteBuffer wholeFileBuffer = null;
+ try
{
- throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
- }
- wholeFileBuffer.position(0);
+ wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
- // First long is the ordering timestamp, we just jump its position
- wholeFileBuffer.position(SIZE_HEADER);
+ int bytesRead = file.getFile().read(wholeFileBuffer);
- int lastDataPos = SIZE_HEADER;
-
- while (wholeFileBuffer.hasRemaining())
- {
- final int pos = wholeFileBuffer.position();
-
- byte recordType = wholeFileBuffer.get();
-
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ if (bytesRead != file.getFile().size())
{
- // I - We scan for any valid record on the file. If a hole
- // happened on the middle of the file we keep looking until all
- // the possibilities are gone
- continue;
+ throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
- {
- reader.markAsDataFile(file);
+ wholeFileBuffer.position(0);
- wholeFileBuffer.position(pos + 1);
- // II - Ignore this record, lets keep looking
- continue;
- }
+ // First long is the ordering timestamp, we just jump its position
+ wholeFileBuffer.position(SIZE_HEADER);
- // III - Every record has the file-id.
- // This is what supports us from not re-filling the whole file
- int readFileId = wholeFileBuffer.getInt();
+ int lastDataPos = SIZE_HEADER;
- long transactionID = 0;
-
- if (isTransaction(recordType))
+ while (wholeFileBuffer.hasRemaining())
{
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ final int pos = wholeFileBuffer.position();
+
+ byte recordType = wholeFileBuffer.get();
+
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
- wholeFileBuffer.position(pos + 1);
- reader.markAsDataFile(file);
+ // I - We scan for any valid record on the file. If a hole
+ // happened on the middle of the file we keep looking until all
+ // the possibilities are gone
continue;
}
- transactionID = wholeFileBuffer.getLong();
- }
-
- long recordID = 0;
-
- // If prepare or commit
- if (!isCompleteTransaction(recordType))
- {
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
{
- wholeFileBuffer.position(pos + 1);
reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ // II - Ignore this record, lets keep looking
continue;
}
- recordID = wholeFileBuffer.getLong();
- }
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+ int readFileId = wholeFileBuffer.getInt();
- // We use the size of the record to validate the health of the
- // record.
- // (V) We verify the size of the record
+ long transactionID = 0;
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
-
- // Used to hold extra data on transaction prepares
- int preparedTransactionExtraDataSize = 0;
-
- byte userRecordType = 0;
-
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+ if (isTransaction(recordType))
{
- wholeFileBuffer.position(pos + 1);
- reader.markAsDataFile(file);
- continue;
+ if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ transactionID = wholeFileBuffer.getLong();
}
- variableSize = wholeFileBuffer.getInt();
+ long recordID = 0;
- if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+ // If prepare or commit
+ if (!isCompleteTransaction(recordType))
{
- wholeFileBuffer.position(pos + 1);
- continue;
- }
+ if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
- if (recordType != DELETE_RECORD_TX)
- {
- userRecordType = wholeFileBuffer.get();
+ recordID = wholeFileBuffer.getLong();
}
- record = new byte[variableSize];
+ // We use the size of the record to validate the health of the
+ // record.
+ // (V) We verify the size of the record
- wholeFileBuffer.get(record);
- }
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
- // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
- // currentFile
- int transactionCheckNumberOfRecords = 0;
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
+ byte userRecordType = 0;
- if (recordType == PREPARE_RECORD)
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
{
- // Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
- }
- variableSize = 0;
- }
+ if (isInvalidSize(wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
- int recordSize = getRecordSize(recordType);
+ variableSize = wholeFileBuffer.getInt();
- // VI - this is completing V, We will validate the size at the end
- // of the record,
- // But we avoid buffer overflows by damaged data
- if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
- {
- // Avoid a buffer overflow caused by damaged data... continue
- // scanning for more pendingTransactions...
- trace("Record at position " + pos +
- " recordType = " +
- recordType +
- " file:" +
- file.getFile().getFileName() +
- " recordSize: " +
- recordSize +
- " variableSize: " +
- variableSize +
- " preparedTransactionExtraDataSize: " +
- preparedTransactionExtraDataSize +
- " is corrupted and it is being ignored (II)");
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
- wholeFileBuffer.position(pos + 1);
+ if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+ {
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
- continue;
- }
+ if (recordType != DELETE_RECORD_TX)
+ {
+ userRecordType = wholeFileBuffer.get();
+ }
- int oldPos = wholeFileBuffer.position();
+ record = new byte[variableSize];
- wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+ wholeFileBuffer.get(record);
+ }
- int checkSize = wholeFileBuffer.getInt();
+ // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+ // currentFile
+ int transactionCheckNumberOfRecords = 0;
- // VII - The checkSize at the end has to match with the size
- // informed at the beggining.
- // This is like testing a hash for the record. (We could replace the
- // checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- trace("Record at position " + pos +
- " recordType = " +
- recordType +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored (III)");
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+ preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+ }
+ variableSize = 0;
+ }
- wholeFileBuffer.position(pos + SIZE_BYTE);
+ int recordSize = getRecordSize(recordType);
- continue;
- }
+ // VI - this is completing V, We will validate the size at the end
+ // of the record,
+ // But we avoid buffer overflows by damaged data
+ if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+ {
+ // Avoid a buffer overflow caused by damaged data... continue
+ // scanning for more pendingTransactions...
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " recordSize: " +
+ recordSize +
+ " variableSize: " +
+ variableSize +
+ " preparedTransactionExtraDataSize: " +
+ preparedTransactionExtraDataSize +
+ " is corrupted and it is being ignored (II)");
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+ wholeFileBuffer.position(pos + 1);
- // This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getFileID())
- {
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
+ continue;
+ }
- continue;
- }
+ int oldPos = wholeFileBuffer.position();
- wholeFileBuffer.position(oldPos);
+ wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - DataConstants.SIZE_INT);
- // At this point everything is checked. So we relax and just load
- // the data now.
+ int checkSize = wholeFileBuffer.getInt();
- switch (recordType)
- {
- case ADD_RECORD:
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the
+ // checkSize by some sort of calculated hash)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored (III)");
- case UPDATE_RECORD:
- {
- reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
- break;
- }
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
- case DELETE_RECORD:
- {
- reader.deleteRecord(recordID);
- break;
- }
+ wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
- case ADD_RECORD_TX:
- {
- reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
- break;
+ continue;
}
- case UPDATE_RECORD_TX:
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getFileID())
{
- reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
- break;
- }
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
- case DELETE_RECORD_TX:
- {
- reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
- break;
+ continue;
}
- case PREPARE_RECORD:
+ wholeFileBuffer.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+
+ switch (recordType)
{
+ case ADD_RECORD:
+ {
+ reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
- byte extraData[] = new byte[preparedTransactionExtraDataSize];
+ case UPDATE_RECORD:
+ {
+ reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
- wholeFileBuffer.get(extraData);
+ case DELETE_RECORD:
+ {
+ reader.deleteRecord(recordID);
+ break;
+ }
- reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+ case ADD_RECORD_TX:
+ {
+ reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
- break;
- }
- case COMMIT_RECORD:
- {
+ case UPDATE_RECORD_TX:
+ {
+ reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
- reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
- break;
+ case DELETE_RECORD_TX:
+ {
+ reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ break;
+ }
+
+ case PREPARE_RECORD:
+ {
+
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ wholeFileBuffer.get(extraData);
+
+ reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+
+ reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
+ break;
+ }
+ case ROLLBACK_RECORD:
+ {
+ reader.rollbackRecord(transactionID);
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " +
+ recordType);
+ }
}
- case ROLLBACK_RECORD:
+
+ checkSize = wholeFileBuffer.getInt();
+
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- reader.rollbackRecord(transactionID);
- break;
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+ ", pos = " +
+ pos);
}
- default:
- {
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " +
- recordType);
- }
- }
- checkSize = wholeFileBuffer.getInt();
+ lastDataPos = wholeFileBuffer.position();
- // This is a sanity check about the loading code itself.
- // If this checkSize doesn't match, it means the reading method is
- // not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
- ", pos = " +
- pos);
}
- lastDataPos = wholeFileBuffer.position();
-
+ return lastDataPos;
}
- fileFactory.releaseBuffer(wholeFileBuffer);
+ finally
+ {
+ if (wholeFileBuffer != null)
+ {
+ fileFactory.releaseBuffer(wholeFileBuffer);
+ }
- file.getFile().close();
-
- return lastDataPos;
-
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
}
/**
@@ -2432,7 +2168,7 @@
* @return
* @throws Exception
*/
- private void writeTransaction(final int fileID,
+ void writeTransaction(final int fileID,
final byte recordType,
final long txID,
final JournalTransaction tx,
@@ -2467,7 +2203,7 @@
* @param size
* @param bb
*/
- private void writeUpdateRecordTX(final int fileID,
+ void writeUpdateRecordTX(final int fileID,
final long txID,
final long id,
final byte recordType,
@@ -2492,7 +2228,7 @@
* @param size
* @param bb
*/
- private void writeUpdateRecord(final int fileId,
+ void writeUpdateRecord(final int fileId,
final long id,
final byte recordType,
final EncodingSupport record,
@@ -2515,7 +2251,7 @@
* @param size
* @param bb
*/
- private void writeAddRecord(final int fileId,
+ void writeAddRecord(final int fileId,
final long id,
final byte recordType,
final EncodingSupport record,
@@ -2538,7 +2274,7 @@
* @param size
* @param bb
*/
- private void writeDeleteRecordTransactional(final int fileID,
+ void writeDeleteRecordTransactional(final int fileID,
final long txID,
final long id,
final EncodingSupport record,
@@ -2566,7 +2302,7 @@
* @param size
* @param bb
*/
- private void writeAddRecordTX(final int fileID,
+ void writeAddRecordTX(final int fileID,
final long txID,
final long id,
final byte recordType,
@@ -2748,14 +2484,14 @@
// The callback of a transaction has to be taken inside the lock,
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
- callback = tx.getCallback(currentFile);
+ callback = tx.getCallback(fileFactory.isSupportsCallbacks(), currentFile);
if (sync)
{
// 99 % of the times this will be already synced, as previous files should be closed already.
// This is to have 100% guarantee the transaction will be persisted and no loss of information would
// happen
- tx.syncPreviousFiles();
+ tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
// We need to add the number of records on currentFile if prepare or commit
@@ -2767,7 +2503,7 @@
}
// Adding fileID
- bb.writerIndex(SIZE_BYTE);
+ bb.writerIndex(DataConstants.SIZE_BYTE);
bb.writeInt(currentFile.getFileID());
if (callback != null)
@@ -2946,7 +2682,7 @@
* @return
* @throws Exception
*/
- private JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
+ public JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
{
JournalFile nextOpenedFile = null;
try
@@ -2998,7 +2734,7 @@
if (tx == null)
{
- tx = new JournalTransaction();
+ tx = new JournalTransaction(this);
JournalTransaction trans = pendingTransactions.putIfAbsent(txID, tx);
@@ -3038,53 +2774,12 @@
// Inner classes
// ---------------------------------------------------------------------------
- // Just encapsulates the VariableLatch waiting for transaction completions
- // Used if the SequentialFile supports Callbacks
- private static class TransactionCallback implements IOCallback
- {
- private final VariableLatch countLatch = new VariableLatch();
-
- private volatile String errorMessage = null;
-
- private volatile int errorCode = 0;
-
- public void countUp()
- {
- countLatch.up();
- }
-
- public void done()
- {
- countLatch.down();
- }
-
- public void waitCompletion() throws InterruptedException
- {
- countLatch.waitCompletion();
-
- if (errorMessage != null)
- {
- throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
- }
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- this.errorMessage = errorMessage;
-
- this.errorCode = errorCode;
-
- countLatch.down();
- }
-
- }
-
/**
* This holds the relationship a record has with other files in regard to reference counting.
* Note: This class used to be called PosFiles
*
* Used on the ref-count for reclaiming */
- private static class JournalRecord
+ public static class JournalRecord
{
private final JournalFile addFile;
@@ -3143,278 +2838,6 @@
}
}
- private class JournalTransaction
- {
- private List<Pair<JournalFile, Long>> pos;
-
- private List<Pair<JournalFile, Long>> neg;
-
- // All the files this transaction is touching on.
- // We can't have those files being reclaimed or compacted if there is a pending transaction
- private Set<JournalFile> pendingFiles;
-
- private TransactionCallback currentCallback;
-
- private Map<JournalFile, TransactionCallback> callbackList;
-
- private JournalFile lastFile = null;
-
- private final AtomicInteger counter = new AtomicInteger();
-
- private AtomicInteger internalgetCounter(final JournalFile file)
- {
- if (lastFile != file)
- {
- lastFile = file;
- counter.set(0);
- }
- return counter;
- }
-
- public int getCounter(final JournalFile file)
- {
- return internalgetCounter(file).intValue();
- }
-
- public void incCounter(final JournalFile file)
- {
- internalgetCounter(file).incrementAndGet();
- }
-
- /**
- * @param currentFile
- * @param bb
- */
- public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
- {
- bb.writerIndex(SIZE_BYTE + SIZE_INT + SIZE_LONG);
-
- bb.writeInt(getCounter(currentFile));
-
- }
-
- /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
- * Because of that, this operation should be almost very fast.*/
- public void syncPreviousFiles() throws Exception
- {
- if (fileFactory.isSupportsCallbacks())
- {
- if (callbackList != null)
- {
- for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
- {
- if (entry.getKey() != currentFile)
- {
- entry.getValue().waitCompletion();
- }
- }
- }
- }
- else
- {
- for (JournalFile file : pendingFiles)
- {
- if (file != currentFile)
- {
- file.getFile().waitForClose();
- }
- }
- }
- }
-
- /**
- * @return
- */
- public TransactionCallback getCallback(JournalFile file) throws Exception
- {
- if (fileFactory.isSupportsCallbacks())
- {
- if (callbackList == null)
- {
- callbackList = new HashMap<JournalFile, TransactionCallback>();
- }
-
- currentCallback = callbackList.get(file);
-
- if (currentCallback == null)
- {
- currentCallback = new TransactionCallback();
- callbackList.put(file, currentCallback);
- }
-
- if (currentCallback.errorMessage != null)
- {
- throw new MessagingException(currentCallback.errorCode, currentCallback.errorMessage);
- }
-
- currentCallback.countUp();
-
- return currentCallback;
- }
- else
- {
- return null;
- }
- }
-
- public void addPositive(final JournalFile file, final long id)
- {
- incCounter(file);
-
- addFile(file);
-
- if (pos == null)
- {
- pos = new ArrayList<Pair<JournalFile, Long>>();
- }
-
- pos.add(new Pair<JournalFile, Long>(file, id));
- }
-
- public void addNegative(final JournalFile file, final long id)
- {
- incCounter(file);
-
- addFile(file);
-
- if (neg == null)
- {
- neg = new ArrayList<Pair<JournalFile, Long>>();
- }
-
- neg.add(new Pair<JournalFile, Long>(file, id));
- }
-
- /**
- * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
- * */
- public void commit(final JournalFile file)
- {
- if (pos != null)
- {
- for (Pair<JournalFile, Long> p : pos)
- {
- JournalRecord posFiles = records.get(p.b);
-
- if (posFiles == null)
- {
- posFiles = new JournalRecord(p.a);
-
- records.put(p.b, posFiles);
- }
- else
- {
- posFiles.addUpdateFile(p.a);
- }
- }
- }
-
- if (neg != null)
- {
- for (Pair<JournalFile, Long> n : neg)
- {
- JournalRecord posFiles = records.remove(n.b);
-
- if (posFiles != null)
- {
- posFiles.delete(n.a);
- }
- }
- }
-
- // Now add negs for the pos we added in each file in which there were
- // transactional operations
-
- for (JournalFile jf : pendingFiles)
- {
- file.incNegCount(jf);
- }
- }
-
- public void waitCallbacks() throws Exception
- {
- if (callbackList != null)
- {
- for (TransactionCallback callback : callbackList.values())
- {
- callback.waitCompletion();
- }
- }
- }
-
- /** Wait completion at the latest file only */
- public void waitCompletion() throws Exception
- {
- if (currentCallback != null)
- {
- currentCallback.waitCompletion();
- }
- }
-
- /**
- * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
- * or else potFilesMap could be affected
- * */
- public void rollback(final JournalFile file)
- {
- // Now add negs for the pos we added in each file in which there were
- // transactional operations
- // Note that we do this on rollback as we do on commit, since we need
- // to ensure the file containing
- // the rollback record doesn't get deleted before the files with the
- // transactional operations are deleted
- // Otherwise we may run into problems especially with XA where we are
- // just left with a prepare when the tx
- // has actually been rolled back
-
- for (JournalFile jf : pendingFiles)
- {
- file.incNegCount(jf);
- }
- }
-
- /**
- * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
- * or else potFilesMap could be affected
- * */
- public void prepare(final JournalFile file)
- {
- // We don't want the prepare record getting deleted before time
-
- addFile(file);
- }
-
- /** Used by load, when the transaction was not loaded correctly */
- public void forget()
- {
- // The transaction was not committed or rolled back in the file, so we
- // reverse any pos counts we added
- for (JournalFile jf : pendingFiles)
- {
- jf.decPosCount();
- }
-
- }
-
- private void addFile(final JournalFile file)
- {
- if (pendingFiles == null)
- {
- pendingFiles = new HashSet<JournalFile>();
- }
-
- if (!pendingFiles.contains(file))
- {
- pendingFiles.add(file);
-
- // We add a pos for the transaction itself in the file - this
- // prevents any transactional operations
- // being deleted before a commit or rollback is written
- file.incPosCount();
- }
- }
- }
-
private static class NullEncoding implements EncodingSupport
{
@@ -3435,7 +2858,7 @@
}
- private static class ByteArrayEncoding implements EncodingSupport
+ public static class ByteArrayEncoding implements EncodingSupport
{
final byte[] data;
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
+
+/**
+ * A JournalTransaction
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalTransaction
+{
+
+ private JournalImpl journal;
+
+ private List<Pair<JournalFile, Long>> pos;
+
+ private List<Pair<JournalFile, Long>> neg;
+
+ // All the files this transaction is touching on.
+ // We can't have those files being reclaimed or compacted if there is a pending transaction
+ private Set<JournalFile> pendingFiles;
+
+ private TransactionCallback currentCallback;
+
+ private Map<JournalFile, TransactionCallback> callbackList;
+
+ private JournalFile lastFile = null;
+
+ private final AtomicInteger counter = new AtomicInteger();
+
+
+ public JournalTransaction(JournalImpl journal)
+ {
+ this.journal = journal;
+ }
+
+ public int getCounter(final JournalFile file)
+ {
+ return internalgetCounter(file).intValue();
+ }
+
+ public void incCounter(final JournalFile file)
+ {
+ internalgetCounter(file).incrementAndGet();
+ }
+
+ /**
+ * @param currentFile
+ * @param bb
+ */
+ public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
+ {
+ bb.writerIndex(DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG);
+
+ bb.writeInt(getCounter(currentFile));
+
+ }
+
+ /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
+ * Because of that, this operation should be almost very fast.*/
+ public void syncPreviousFiles(boolean callbacks, JournalFile currentFile) throws Exception
+ {
+ if (callbacks)
+ {
+ if (callbackList != null)
+ {
+ for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
+ {
+ if (entry.getKey() != currentFile)
+ {
+ entry.getValue().waitCompletion();
+ }
+ }
+ }
+ }
+ else
+ {
+ for (JournalFile file : pendingFiles)
+ {
+ if (file != currentFile)
+ {
+ file.getFile().waitForClose();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return
+ */
+ public TransactionCallback getCallback(boolean callbacks, JournalFile file) throws Exception
+ {
+ if (callbacks)
+ {
+ if (callbackList == null)
+ {
+ callbackList = new HashMap<JournalFile, TransactionCallback>();
+ }
+
+ currentCallback = callbackList.get(file);
+
+ if (currentCallback == null)
+ {
+ currentCallback = new TransactionCallback();
+ callbackList.put(file, currentCallback);
+ }
+
+ if (currentCallback.getErrorMessage() != null)
+ {
+ throw new MessagingException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
+ }
+
+ currentCallback.countUp();
+
+ return currentCallback;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void addPositive(final JournalFile file, final long id)
+ {
+ incCounter(file);
+
+ addFile(file);
+
+ if (pos == null)
+ {
+ pos = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ pos.add(new Pair<JournalFile, Long>(file, id));
+ }
+
+ public void addNegative(final JournalFile file, final long id)
+ {
+ incCounter(file);
+
+ addFile(file);
+
+ if (neg == null)
+ {
+ neg = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ neg.add(new Pair<JournalFile, Long>(file, id));
+ }
+
+ /**
+ * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+ * */
+ public void commit(final JournalFile file)
+ {
+ if (pos != null)
+ {
+ for (Pair<JournalFile, Long> p : pos)
+ {
+ JournalImpl.JournalRecord posFiles = journal.getRecords().get(p.b);
+
+ if (posFiles == null)
+ {
+ posFiles = new JournalImpl.JournalRecord(p.a);
+
+ journal.getRecords().put(p.b, posFiles);
+ }
+ else
+ {
+ posFiles.addUpdateFile(p.a);
+ }
+ }
+ }
+
+ if (neg != null)
+ {
+ for (Pair<JournalFile, Long> n : neg)
+ {
+ JournalImpl.JournalRecord posFiles = journal.getRecords().remove(n.b);
+
+ if (posFiles != null)
+ {
+ posFiles.delete(n.a);
+ }
+ }
+ }
+
+ // Now add negs for the pos we added in each file in which there were
+ // transactional operations
+
+ for (JournalFile jf : pendingFiles)
+ {
+ file.incNegCount(jf);
+ }
+ }
+
+ public void waitCallbacks() throws Exception
+ {
+ if (callbackList != null)
+ {
+ for (TransactionCallback callback : callbackList.values())
+ {
+ callback.waitCompletion();
+ }
+ }
+ }
+
+ /** Wait completion at the latest file only */
+ public void waitCompletion() throws Exception
+ {
+ if (currentCallback != null)
+ {
+ currentCallback.waitCompletion();
+ }
+ }
+
+ /**
+ * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * or else potFilesMap could be affected
+ * */
+ public void rollback(final JournalFile file)
+ {
+ // Now add negs for the pos we added in each file in which there were
+ // transactional operations
+ // Note that we do this on rollback as we do on commit, since we need
+ // to ensure the file containing
+ // the rollback record doesn't get deleted before the files with the
+ // transactional operations are deleted
+ // Otherwise we may run into problems especially with XA where we are
+ // just left with a prepare when the tx
+ // has actually been rolled back
+
+ for (JournalFile jf : pendingFiles)
+ {
+ file.incNegCount(jf);
+ }
+ }
+
+ /**
+ * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * or else potFilesMap could be affected
+ * */
+ public void prepare(final JournalFile file)
+ {
+ // We don't want the prepare record getting deleted before time
+
+ addFile(file);
+ }
+
+ /** Used by load, when the transaction was not loaded correctly */
+ public void forget()
+ {
+ // The transaction was not committed or rolled back in the file, so we
+ // reverse any pos counts we added
+ for (JournalFile jf : pendingFiles)
+ {
+ jf.decPosCount();
+ }
+
+ }
+
+ private AtomicInteger internalgetCounter(final JournalFile file)
+ {
+ if (lastFile != file)
+
+ {
+ lastFile = file;
+ counter.set(0);
+ }
+ return counter;
+ }
+
+ private void addFile(final JournalFile file)
+ {
+ if (pendingFiles == null)
+ {
+ pendingFiles = new HashSet<JournalFile>();
+ }
+
+ if (!pendingFiles.contains(file))
+ {
+ pendingFiles.add(file);
+
+ // We add a pos for the transaction itself in the file - this
+ // prevents any transactional operations
+ // being deleted before a commit or rollback is written
+ file.incPosCount();
+ }
+ }
+}
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/TransactionCallback.java 2009-06-29 18:02:57 UTC (rev 7496)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.utils.VariableLatch;
+
+/**
+ * A TransactionCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TransactionCallback implements IOCallback
+{
+ private final VariableLatch countLatch = new VariableLatch();
+
+ private volatile String errorMessage = null;
+
+ private volatile int errorCode = 0;
+
+ public void countUp()
+ {
+ countLatch.up();
+ }
+
+ public void done()
+ {
+ countLatch.down();
+ }
+
+ public void waitCompletion() throws InterruptedException
+ {
+ countLatch.waitCompletion();
+
+ if (errorMessage != null)
+ {
+ throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
+ }
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ this.errorMessage = errorMessage;
+
+ this.errorCode = errorCode;
+
+ countLatch.down();
+ }
+
+ /**
+ * @return the errorMessage
+ */
+ public String getErrorMessage()
+ {
+ return errorMessage;
+ }
+
+ /**
+ * @return the errorCode
+ */
+ public int getErrorCode()
+ {
+ return errorCode;
+ }
+
+
+
+
+}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-29 14:55:19 UTC (rev 7495)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-29 18:02:57 UTC (rev 7496)
@@ -64,36 +64,39 @@
public void testCompactwithPendingCommit() throws Exception
{
- InternalCompactTest(false, false, false, true);
+ InternalCompactTest(false, false, false, false, true);
}
public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
{
- InternalCompactTest(false, true, true, true);
+ InternalCompactTest(true, false, true, true, false);
}
-
public void testCompactwithConcurrentDeletes() throws Exception
{
- InternalCompactTest(false, false, true, true);
+ InternalCompactTest(true, false, false, true, false);
}
public void testCompactwithConcurrentUpdates() throws Exception
{
- InternalCompactTest(false, true, false, true);
+ InternalCompactTest(true, false, true, false, false);
}
public void testCompactWithConcurrentAppend() throws Exception
{
- InternalCompactTest(true, false, false, true);
+ InternalCompactTest(true, true, false, false, false);
}
- private void InternalCompactTest(final boolean performAppend, final boolean performUpdate, final boolean performDelete, final boolean pendingTransactions) throws Exception
+ private void InternalCompactTest(final boolean regularAdd,
+ final boolean performAppend,
+ final boolean performUpdate,
+ final boolean performDelete,
+ final boolean pendingTransactions) throws Exception
{
setup(50, 60 * 1024, true);
ArrayList<Long> liveIDs = new ArrayList<Long>();
-
+
ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
final CountDownLatch latchDone = new CountDownLatch(1);
@@ -121,32 +124,37 @@
long transactionID = 0;
- for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
+ if (regularAdd)
{
- add(i);
- if (i % 10 == 0 && i > 0)
+
+
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
{
- journal.forceMoveNextFile();
+ add(i);
+ if (i % 10 == 0 && i > 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ update(i);
}
- update(i);
- }
- for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
- {
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
+ {
- addTx(transactionID, i);
- updateTx(transactionID, i);
- if (i % 10 == 0)
- {
- journal.forceMoveNextFile();
+ addTx(transactionID, i);
+ updateTx(transactionID, i);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ commit(transactionID++);
+ update(i);
}
- commit(transactionID++);
- update(i);
}
-
+
if (pendingTransactions)
{
- for (long i = 0; i < 100 ; i++)
+ for (long i = 0; i < 100; i++)
{
addTx(transactionID, idGenerator.generateID());
updateTx(transactionID, idGenerator.generateID());
@@ -156,16 +164,19 @@
System.out.println("Number of Files: " + journal.getDataFilesCount());
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
+ if (regularAdd)
{
- if (!(i % 10 == 0))
+ for (int i = 0; i < NUMBER_OF_RECORDS; i++)
{
- delete(i);
+ if (!(i % 10 == 0))
+ {
+ delete(i);
+ }
+ else
+ {
+ liveIDs.add((long)i);
+ }
}
- else
- {
- liveIDs.add((long)i);
- }
}
journal.forceMoveNextFile();
@@ -211,7 +222,7 @@
update(liveID);
}
}
-
+
if (performDelete)
{
for (long liveID : liveIDs)
@@ -219,7 +230,7 @@
delete(liveID);
}
}
-
+
if (pendingTransactions)
{
for (long tx : listPendingTransactions)
@@ -256,7 +267,7 @@
add(idGenerator.generateID());
- journal.compact();
+ //journal.compact();
stopJournal();
createJournal();
More information about the jboss-cvs-commits
mailing list