Author: borges
Date: 2011-08-11 13:09:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11189
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
Log:
HORNETQ-720 Add number of operations to COMMIT/PREPARE journal entries
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-11
17:08:43 UTC (rev 11188)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-11
17:09:12 UTC (rev 11189)
@@ -1,6 +1,9 @@
package org.hornetq.core.journal.impl;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.hornetq.api.core.HornetQException;
@@ -29,6 +32,8 @@
private final ReentrantLock lockAppend = new ReentrantLock();
// private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
+ private final ConcurrentMap<Long, AtomicInteger> transactions = new
ConcurrentHashMap<Long, AtomicInteger>();
+
private final JournalFile currentFile;
/**
@@ -62,16 +67,6 @@
// ------------------------
-// private void readLockJournal()
-// {
-// journalLock.readLock().lock();
-// }
-//
-// private void readUnlockJournal()
-// {
-// journalLock.readLock().unlock();
-// }
-
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
throws Exception
@@ -84,10 +79,10 @@
/**
* Write the record to the current file.
*/
- private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion
callback) throws Exception
+ private void writeRecord(JournalInternalRecord encoder, final boolean sync, final
IOCompletion callback)
+ throws Exception
{
-
lockAppend.lock();
try
{
@@ -130,6 +125,7 @@
public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
throws Exception
{
+ count(txID);
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id,
recordType, record);
writeRecord(addRecord, false, null);
}
@@ -147,6 +143,7 @@
public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
throws Exception
{
+ count(txID);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id,
recordType, record);
writeRecord(updateRecordTX, false, null);
}
@@ -156,7 +153,13 @@
throws Exception
{
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, txID,
null);
- writeRecord(commitRecord, sync, callback);
+ AtomicInteger value = transactions.remove(Long.valueOf(txID));
+ if (value != null)
+ {
+ commitRecord.setNumberOfRecords(value.get());
+ }
+
+ writeRecord(commitRecord, true, callback);
}
@Override
@@ -164,9 +167,27 @@
throws Exception
{
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID,
transactionData);
+ AtomicInteger value = transactions.get(Long.valueOf(txID));
+ if (value != null)
+ {
+ prepareRecord.setNumberOfRecords(value.get());
+ }
writeRecord(prepareRecord, sync, callback);
}
+ private int count(long txID) throws HornetQException
+ {
+ AtomicInteger defaultValue = new AtomicInteger(1);
+ AtomicInteger count = transactions.putIfAbsent(Long.valueOf(txID), defaultValue);
+ if (count != null)
+ {
+ return count.incrementAndGet();
+ }
+ return defaultValue.get();
+ }
+
+ // UNSUPPORTED STUFF
+
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
{