[jboss-cvs] JBoss Messaging SVN: r7473 - branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 25 17:42:56 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-25 17:42:56 -0400 (Thu, 25 Jun 2009)
New Revision: 7473
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
changes
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 18:39:32 UTC (rev 7472)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 21:42:56 UTC (rev 7473)
@@ -194,7 +194,7 @@
// Compacting may replace this structure
private volatile ConcurrentMap<Long, JournalTransaction> pendingTransactions = new ConcurrentHashMap<Long, JournalTransaction>();
-
+
// This will be filled only while the Compactor is being done
private volatile Compactor compactor;
@@ -202,13 +202,9 @@
private final Semaphore lock = new Semaphore(1);
+ /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
- /** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
- private final Lock readLockCompact = compactingLock.readLock();
-
- private final Lock writeLockCompact = compactingLock.writeLock();
-
private volatile JournalFile currentFile;
private volatile int state;
@@ -285,7 +281,7 @@
IOCallback callback = null;
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -311,7 +307,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
if (callback != null)
@@ -334,7 +330,7 @@
IOCallback callback = null;
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -374,7 +370,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
if (callback != null)
@@ -390,7 +386,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
IOCallback callback = null;
@@ -429,7 +425,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
if (callback != null)
@@ -454,7 +450,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -481,7 +477,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
}
@@ -503,7 +499,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -537,7 +533,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
}
@@ -553,24 +549,15 @@
throw new IllegalStateException("Journal must be loaded first");
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
- int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ int size = SIZE_DELETE_RECORD_TX + record.getEncodeSize();
ChannelBuffer bb = newBuffer(size);
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record != null ? record.getEncodeSize() : 0);
- if (record != null)
- {
- record.encode(bb);
- }
- bb.writeInt(size);
+ writeDeleteRecordTransactional(-1, txID, id, record, size, bb);
JournalTransaction tx = getTransactionInfo(txID);
@@ -588,51 +575,13 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
}
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- readLockCompact.lock();
-
- try
- {
- int size = SIZE_DELETE_RECORD_TX;
-
- ChannelBuffer bb = newBuffer(size);
-
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(0);
- bb.writeInt(size);
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
-
- tx.addNegative(usedFile, id);
- }
- finally
- {
- lock.release();
- }
- }
- finally
- {
- readLockCompact.unlock();
- }
-
+ appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
/**
@@ -662,7 +611,7 @@
tx.syncPreviousFiles();
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -687,7 +636,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
// We should wait this outside of the lock, to increase throughput
@@ -720,7 +669,7 @@
JournalTransaction tx = pendingTransactions.remove(txID);
- readLockCompact.lock();
+ compactingLock.readLock().lock();
try
{
@@ -749,7 +698,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
if (sync)
@@ -766,7 +715,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- readLockCompact.lock();
+ compactingLock.readLock().lock();
JournalTransaction tx = null;
@@ -803,7 +752,7 @@
}
finally
{
- readLockCompact.unlock();
+ compactingLock.readLock().unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -868,7 +817,7 @@
ConcurrentMap<Long, JournalRecord> recordsSnapshot = null;
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
-
+
Map<Long, JournalTransaction> pendingTransactions;
boolean previousReclaimValue = autoReclaim;
@@ -879,7 +828,7 @@
// First, we replace the records by a new one.
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
- writeLockCompact.lock();
+ compactingLock.writeLock().lock();
try
{
autoReclaim = false;
@@ -902,16 +851,13 @@
break;
}
}
-
-
this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
-
}
finally
{
- writeLockCompact.unlock();
+ compactingLock.writeLock().unlock();
}
}
@@ -925,9 +871,9 @@
readJournalFile(file, compactor);
}
- compactor.flushBuffer();
+ compactor.flush();
- writeLockCompact.lock();
+ compactingLock.writeLock().lock();
try
{
// Restore relationshipMap
@@ -936,7 +882,7 @@
}
finally
{
- writeLockCompact.unlock();
+ compactingLock.writeLock().unlock();
}
}
@@ -956,14 +902,16 @@
int nextOrderingID;
final Map<Long, JournalRecord> recordsSnapshot;
+
final Map<Long, JournalTransaction> pendingTransactions;
-
+
final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+
final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
-
-
- public Compactor(Map<Long, JournalRecord> recordsSnapshot, Map<Long, JournalTransaction> pendingTransactions, int firstFileID)
+ public Compactor(Map<Long, JournalRecord> recordsSnapshot,
+ Map<Long, JournalTransaction> pendingTransactions,
+ int firstFileID)
{
this.recordsSnapshot = recordsSnapshot;
this.nextOrderingID = firstFileID;
@@ -985,7 +933,7 @@
}
}
- public void flushBuffer() throws Exception
+ public void flush() throws Exception
{
if (bufferWrite != null)
{
@@ -1002,7 +950,7 @@
*/
private void openFile() throws Exception
{
- flushBuffer();
+ flush();
bufferWrite = fileFactory.newBuffer(fileSize);
channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
@@ -1032,25 +980,23 @@
new ByteArrayEncoding(info.data),
size,
channelWrapper);
-
+
newRecords.put(info.id, new JournalRecord(currentFile));
}
}
public void addRecordTX(long transactionID, RecordInfo info) throws Exception
{
- JournalTransaction pending = pendingTransactions.get(transactionID);
-
- if (pending != null)
+ if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
+
int size = SIZE_ADD_RECORD_TX + info.data.length;
-
+
checkSize(size);
-
+
newTransaction.addPositive(currentFile, info.id);
-
+
writeAddRecordTX(fileID,
transactionID,
info.id,
@@ -1059,10 +1005,8 @@
size,
channelWrapper);
}
- else
- if (recordsSnapshot.get(info.id) != null)
+ else if (recordsSnapshot.get(info.id) != null)
{
- System.out.println("AddRecordTX for a committed record, just converting it as a regular record");
// AddRecordTX for a committed record, just converting it as a regular record
// The record is already confirmed. There is no need to keep the transaction information during compacting
addRecord(info);
@@ -1071,28 +1015,47 @@
public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
- // Even though this shouldn't happen, I'm processing the commit as it was legal (instead of throwing it away)
JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-
+
if (pendingTx != null)
{
- log.warn("A commit record for a pending transaction is being read on compactor. This shouldn't happen");
-
- JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- checkSize(SIZE_COMMIT_RECORD);
- writeTransaction(fileID, COMMIT_RECORD, transactionID, newTransaction, null, SIZE_COMMIT_RECORD, pendingTx.getCounter(currentFile), channelWrapper);
- newTransaction.commit(currentFile);
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
+ " for an already committed transaction during compacting");
}
}
public void deleteRecord(long recordID) throws Exception
{
- // nothing to be done here
+ // nothing to be done here, if it is a delete, the record is already gone.. so.. no worries
+
+ if (records.get(recordID) != null)
+ {
+ // Sanity check, it should never happen
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record");
+ }
+
}
public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
- // nothing to be done here
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = SIZE_DELETE_RECORD_TX + recordInfo.data.length;
+
+ checkSize(size);
+
+ writeDeleteRecordTransactional(fileID,
+ transactionID,
+ recordInfo.id,
+ new ByteArrayEncoding(recordInfo.data),
+ size,
+ channelWrapper);
+
+ newTransaction.addNegative(currentFile, recordInfo.id);
+ }
}
public void markAsDataFile(JournalFile file)
@@ -1104,12 +1067,35 @@
{
if (pendingTransactions.get(transactionID) != null)
{
-
+
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + SIZE_INT;
+
+ checkSize(size);
+
+ writeTransaction(fileID,
+ PREPARE_RECORD,
+ transactionID,
+ newTransaction,
+ new ByteArrayEncoding(extraData),
+ size,
+ newTransaction.getCounter(currentFile),
+ channelWrapper);
+
+ newTransaction.prepare(currentFile);
+
}
}
public void rollbackRecord(long transactionID) throws Exception
{
+ if (pendingTransactions.get(transactionID) != null)
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+ " for an already rolled back transaction during compacting");
+ }
}
public void updateRecord(RecordInfo info) throws Exception
@@ -1127,7 +1113,6 @@
System.out.println("UpdateTX " + info.id + " to be out on compacted file");
}
}
-
/**
* @param transactionID
@@ -1143,7 +1128,6 @@
}
return newTransaction;
}
-
}
@@ -2019,7 +2003,8 @@
wholeFileBuffer.get(record);
}
- // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the currentFile
+ // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+ // currentFile
int transactionCheckNumberOfRecords = 0;
if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
@@ -2298,6 +2283,32 @@
/**
* @param txID
* @param id
+ * @param record
+ * @param size
+ * @param bb
+ */
+ private void writeDeleteRecordTransactional(final int fileID,
+ final long txID,
+ final long id,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
+ {
+ bb.writeByte(DELETE_RECORD_TX);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record != null ? record.getEncodeSize() : 0);
+ if (record != null)
+ {
+ record.encode(bb);
+ }
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param txID
+ * @param id
* @param recordType
* @param record
* @param recordLength
@@ -2727,6 +2738,8 @@
private void closeFile(final JournalFile file)
{
fileFactory.deactivate(file.getFile());
+ dataFiles.add(file);
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2739,7 +2752,6 @@
{
log.warn(e.getMessage(), e);
}
- dataFiles.add(file);
}
});
}
@@ -3172,6 +3184,26 @@
}
}
+ private static class NullEncoding implements EncodingSupport
+ {
+
+ static NullEncoding instance = new NullEncoding();
+
+ public void decode(MessagingBuffer buffer)
+ {
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ }
+
+ public int getEncodeSize()
+ {
+ return 0;
+ }
+
+ }
+
private static class ByteArrayEncoding implements EncodingSupport
{
More information about the jboss-cvs-commits
mailing list