[jboss-cvs] JBoss Messaging SVN: r4678 - trunk/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 11 21:19:25 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-11 21:19:25 -0400 (Fri, 11 Jul 2008)
New Revision: 4678
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
This file was messed up with spaces and tabs, so I used my IDE's alignment tool to fix it up.
(no code changes on this commit.. just format)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-12 01:14:15 UTC (rev 4677)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-12 01:19:25 UTC (rev 4678)
@@ -70,109 +70,109 @@
*/
public class JournalImpl implements TestableJournal
{
- private static final Logger log = Logger.getLogger(JournalImpl.class);
-
- private static final boolean trace = log.isTraceEnabled();
-
- private static final int STATE_STOPPED = 0;
-
- private static final int STATE_STARTED = 1;
-
- private static final int STATE_LOADED = 2;
-
- // The sizes of primitive types
-
- private static final int SIZE_LONG = 8;
-
- private static final int SIZE_INT = 4;
-
- private static final int SIZE_BYTE = 1;
-
- public static final int MIN_FILE_SIZE = 1024;
-
- public static final int MIN_TASK_PERIOD = 1000;
-
- //Record markers - they must be all unique
-
- public static final int SIZE_HEADER = 8;
-
- public static final int SIZE_ADD_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length
-
- public static final byte ADD_RECORD = 11;
-
- public static final byte SIZE_UPDATE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length;
-
- public static final byte UPDATE_RECORD = 12;
-
- public static final int SIZE_DELETE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-
- public static final byte DELETE_RECORD = 13;
-
- public static final byte ADD_RECORD_TX = 14;
-
- public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
-
- public static final int SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
-
- public static final byte UPDATE_RECORD_TX = 15;
-
- public static final int SIZE_DELETE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
-
- public static final byte DELETE_RECORD_TX = 16;
-
- public static final int SIZE_PREPARE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-
- public static final byte PREPARE_RECORD = 17;
-
-
- public static final byte SIZE_COMMIT_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-
- public static final byte COMMIT_RECORD = 18;
-
- public static final byte SIZE_ROLLBACK_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-
- public static final byte ROLLBACK_RECORD = 19;
-
- public static final byte DONE = 20;
-
- public static final byte FILL_CHARACTER = 74; // Letter 'J'
-
-
- // used for Asynchronous IO only (ignored on NIO).
- private final int maxAIO;
-
+ private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ private static final int STATE_STOPPED = 0;
+
+ private static final int STATE_STARTED = 1;
+
+ private static final int STATE_LOADED = 2;
+
+ // The sizes of primitive types
+
+ private static final int SIZE_LONG = 8;
+
+ private static final int SIZE_INT = 4;
+
+ private static final int SIZE_BYTE = 1;
+
+ public static final int MIN_FILE_SIZE = 1024;
+
+ public static final int MIN_TASK_PERIOD = 1000;
+
+ //Record markers - they must be all unique
+
+ public static final int SIZE_HEADER = 8;
+
+ public static final int SIZE_ADD_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length
+
+ public static final byte ADD_RECORD = 11;
+
+ public static final byte SIZE_UPDATE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length;
+
+ public static final byte UPDATE_RECORD = 12;
+
+ public static final int SIZE_DELETE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ public static final byte DELETE_RECORD = 13;
+
+ public static final byte ADD_RECORD_TX = 14;
+
+ public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+
+ public static final int SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+
+ public static final byte UPDATE_RECORD_TX = 15;
+
+ public static final int SIZE_DELETE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
+
+ public static final byte DELETE_RECORD_TX = 16;
+
+ public static final int SIZE_PREPARE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ public static final byte PREPARE_RECORD = 17;
+
+
+ public static final byte SIZE_COMMIT_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ public static final byte COMMIT_RECORD = 18;
+
+ public static final byte SIZE_ROLLBACK_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ public static final byte ROLLBACK_RECORD = 19;
+
+ public static final byte DONE = 20;
+
+ public static final byte FILL_CHARACTER = 74; // Letter 'J'
+
+
// used for Asynchronous IO only (ignored on NIO).
- private final long aioTimeout; // in ms
-
- private final int fileSize;
-
- private final int minFiles;
-
- private final boolean syncTransactional;
-
- private final boolean syncNonTransactional;
-
- private final SequentialFileFactory fileFactory;
-
- private final long taskPeriod;
-
- public final String filePrefix;
-
- public final String fileExtension;
-
-
- private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
-
- private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
-
- private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
-
- private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
-
- private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
-
+ private final int maxAIO;
+
+ // used for Asynchronous IO only (ignored on NIO).
+ private final long aioTimeout; // in ms
+
+ private final int fileSize;
+
+ private final int minFiles;
+
+ private final boolean syncTransactional;
+
+ private final boolean syncNonTransactional;
+
+ private final SequentialFileFactory fileFactory;
+
+ private final long taskPeriod;
+
+ public final String filePrefix;
+
+ public final String fileExtension;
+
+
+ private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
+
+ private final Map<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
+
+ private final Map<Long, TransactionNegPos> transactionInfos = new ConcurrentHashMap<Long, TransactionNegPos>();
+
+ private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
+
private ExecutorService closingExecutor = null;
/**
@@ -181,92 +181,92 @@
* */
private ExecutorService openExecutor = null;
- /*
+ /*
* We use a semaphore rather than synchronized since it performs better when
* contended
*/
-
- //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+
+ //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
// this locks access to currentFile
- private final Semaphore lock = new Semaphore(1, true);
-
- private volatile JournalFile currentFile ;
-
- private volatile int state;
-
- private volatile long lastOrderingID;
-
- private final Timer timer = new Timer(true);
-
- private TimerTask reclaimerTask;
-
- private final AtomicLong transactionIDSequence = new AtomicLong(0);
-
- private Reclaimer reclaimer = new Reclaimer();
-
- public JournalImpl(final int fileSize, final int minFiles,
- final boolean syncTransactional, final boolean syncNonTransactional,
- final SequentialFileFactory fileFactory, final long taskPeriod,
- final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
- {
- if (fileSize < MIN_FILE_SIZE)
- {
- throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
- }
- if (minFiles < 2)
- {
- throw new IllegalArgumentException("minFiles cannot be less than 2");
- }
- if (fileFactory == null)
- {
- throw new NullPointerException("fileFactory is null");
- }
- if (taskPeriod < MIN_TASK_PERIOD)
- {
- throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
- }
- if (filePrefix == null)
- {
- throw new NullPointerException("filePrefix is null");
- }
- if (fileExtension == null)
- {
- throw new NullPointerException("fileExtension is null");
- }
- if (maxAIO <= 0)
- {
- throw new IllegalStateException("maxAIO should aways be a positive number");
- }
- if (aioTimeout < 1)
- {
- throw new IllegalStateException("aio-timeout cannot be less than 1 second");
- }
-
- this.fileSize = fileSize;
-
- this.minFiles = minFiles;
-
- this.syncTransactional = syncTransactional;
-
- this.syncNonTransactional = syncNonTransactional;
-
- this.fileFactory = fileFactory;
-
- this.taskPeriod = taskPeriod;
-
- this.filePrefix = filePrefix;
-
- this.fileExtension = fileExtension;
-
- this.maxAIO = maxAIO;
-
- this.aioTimeout = aioTimeout;
- }
-
- // Journal implementation ----------------------------------------------------------------
-
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+ private final Semaphore lock = new Semaphore(1, true);
+
+ private volatile JournalFile currentFile ;
+
+ private volatile int state;
+
+ private volatile long lastOrderingID;
+
+ private final Timer timer = new Timer(true);
+
+ private TimerTask reclaimerTask;
+
+ private final AtomicLong transactionIDSequence = new AtomicLong(0);
+
+ private Reclaimer reclaimer = new Reclaimer();
+
+ public JournalImpl(final int fileSize, final int minFiles,
+ final boolean syncTransactional, final boolean syncNonTransactional,
+ final SequentialFileFactory fileFactory, final long taskPeriod,
+ final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
{
+ if (fileSize < MIN_FILE_SIZE)
+ {
+ throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
+ }
+ if (minFiles < 2)
+ {
+ throw new IllegalArgumentException("minFiles cannot be less than 2");
+ }
+ if (fileFactory == null)
+ {
+ throw new NullPointerException("fileFactory is null");
+ }
+ if (taskPeriod < MIN_TASK_PERIOD)
+ {
+ throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
+ }
+ if (filePrefix == null)
+ {
+ throw new NullPointerException("filePrefix is null");
+ }
+ if (fileExtension == null)
+ {
+ throw new NullPointerException("fileExtension is null");
+ }
+ if (maxAIO <= 0)
+ {
+ throw new IllegalStateException("maxAIO should aways be a positive number");
+ }
+ if (aioTimeout < 1)
+ {
+ throw new IllegalStateException("aio-timeout cannot be less than 1 second");
+ }
+
+ this.fileSize = fileSize;
+
+ this.minFiles = minFiles;
+
+ this.syncTransactional = syncTransactional;
+
+ this.syncNonTransactional = syncNonTransactional;
+
+ this.fileFactory = fileFactory;
+
+ this.taskPeriod = taskPeriod;
+
+ this.filePrefix = filePrefix;
+
+ this.fileExtension = fileExtension;
+
+ this.maxAIO = maxAIO;
+
+ this.aioTimeout = aioTimeout;
+ }
+
+ // Journal implementation ----------------------------------------------------------------
+
+ public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+ {
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -287,34 +287,34 @@
bb.rewind();
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-
+
posFilesMap.put(id, new PosFiles(usedFile));
}
-
- public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_ADD_RECORD + record.length;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(ADD_RECORD);
- bb.putLong(id);
- bb.put(recordType);
- bb.putInt(record.length);
- bb.put(record);
- bb.put(DONE);
- bb.rewind();
-
+
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_ADD_RECORD + record.length;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(ADD_RECORD);
+ bb.putLong(id);
+ bb.put(recordType);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.rewind();
+
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
- posFilesMap.put(id, new PosFiles(usedFile));
- }
-
+
+ posFilesMap.put(id, new PosFiles(usedFile));
+ }
+
public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
{
if (state != STATE_LOADED)
@@ -340,9 +340,9 @@
bb.put(record);
bb.put(DONE);
bb.rewind();
-
+
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
+
posFiles.addUpdateFile(usedFile);
}
@@ -371,44 +371,44 @@
record.encode(bb);
bb.putByte(DONE);
bb.rewind();
-
+
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
-
+
posFiles.addUpdateFile(usedFile);
}
- public void appendDeleteRecord(long id) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- PosFiles posFiles = posFilesMap.remove(id);
-
- if (posFiles == null)
- {
- throw new IllegalStateException("Cannot find add info " + id);
- }
-
- int size = SIZE_DELETE_RECORD;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(DELETE_RECORD);
- bb.putLong(id);
- bb.put(DONE);
- bb.rewind();
-
+ public void appendDeleteRecord(long id) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ PosFiles posFiles = posFilesMap.remove(id);
+
+ if (posFiles == null)
+ {
+ throw new IllegalStateException("Cannot find add info " + id);
+ }
+
+ int size = SIZE_DELETE_RECORD;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(DELETE_RECORD);
+ bb.putLong(id);
+ bb.put(DONE);
+ bb.rewind();
+
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
posFiles.addDelete(usedFile);
- }
-
- public long getTransactionID()
- {
- return transactionIDSequence.getAndIncrement();
- }
-
+ }
+
+ public long getTransactionID()
+ {
+ return transactionIDSequence.getAndIncrement();
+ }
+
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType,
final EncodingSupport record) throws Exception
{
@@ -433,7 +433,7 @@
bb.rewind();
JournalFile usedFile;
-
+
usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
TransactionNegPos tx = getTransactionInfo(txID);
@@ -441,35 +441,35 @@
tx.addPos(usedFile, id);
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
{
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_ADD_RECORD_TX + record.length;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(ADD_RECORD_TX);
- bb.putLong(txID);
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_ADD_RECORD_TX + record.length;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(ADD_RECORD_TX);
+ bb.putLong(txID);
bb.put(recordType);
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(record);
- bb.put(DONE);
- bb.rewind();
-
- JournalFile usedFile;
-
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.rewind();
+
+ JournalFile usedFile;
+
usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addPos(usedFile, id);
+
+ TransactionNegPos tx = getTransactionInfo(txID);
+
+ tx.addPos(usedFile, id);
}
-
+
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
{
if (state != STATE_LOADED)
@@ -529,130 +529,130 @@
tx.addPos(usedFile, id);
}
- public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_DELETE_RECORD_TX;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(DELETE_RECORD_TX);
- bb.putLong(txID);
- bb.putLong(id);
- bb.put(DONE);
- bb.rewind();
-
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_DELETE_RECORD_TX;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(DELETE_RECORD_TX);
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.put(DONE);
+ bb.rewind();
+
JournalFile usedFile;
usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
+
TransactionNegPos tx = getTransactionInfo(txID);
-
- tx.addNeg(usedFile, id);
- }
-
- public void appendPrepareRecord(final long txID) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- TransactionNegPos tx = transactionInfos.get(txID);
-
- if (tx == null)
- {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
-
- int size = SIZE_PREPARE_RECORD;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(PREPARE_RECORD);
- bb.putLong(txID);
- bb.put(DONE);
- bb.rewind();
-
- JournalFile usedFile;
+ tx.addNeg(usedFile, id);
+ }
+
+ public void appendPrepareRecord(final long txID) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ TransactionNegPos tx = transactionInfos.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ int size = SIZE_PREPARE_RECORD;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(PREPARE_RECORD);
+ bb.putLong(txID);
+ bb.put(DONE);
+ bb.rewind();
+
+ JournalFile usedFile;
+
usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
- tx.prepare(usedFile);
- }
-
- public void appendCommitRecord(final long txID) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
+ tx.prepare(usedFile);
+ }
+
+ public void appendCommitRecord(final long txID) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
- TransactionNegPos tx = transactionInfos.remove(txID);
-
- if (tx == null)
- {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
-
- int size = SIZE_COMMIT_RECORD;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(COMMIT_RECORD);
- bb.putLong(txID);
- bb.put(DONE);
- bb.rewind();
-
- JournalFile usedFile;
+ TransactionNegPos tx = transactionInfos.remove(txID);
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ int size = SIZE_COMMIT_RECORD;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(COMMIT_RECORD);
+ bb.putLong(txID);
+ bb.put(DONE);
+ bb.rewind();
+
+ JournalFile usedFile;
+
usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
- transactionCallbacks.remove(txID);
-
- tx.commit(usedFile);
-
- }
-
- public void appendRollbackRecord(final long txID) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- TransactionNegPos tx = transactionInfos.remove(txID);
-
- if (tx == null)
- {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
-
- int size = SIZE_ROLLBACK_RECORD;
-
- ByteBuffer bb = fileFactory.newBuffer(size);
-
- bb.put(ROLLBACK_RECORD);
- bb.putLong(txID);
- bb.put(DONE);
- bb.rewind();
-
- JournalFile usedFile;
-
- usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
-
- transactionCallbacks.remove(txID);
-
- tx.rollback(usedFile);
- }
-
+
+ transactionCallbacks.remove(txID);
+
+ tx.commit(usedFile);
+
+ }
+
+ public void appendRollbackRecord(final long txID) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ TransactionNegPos tx = transactionInfos.remove(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ int size = SIZE_ROLLBACK_RECORD;
+
+ ByteBuffer bb = fileFactory.newBuffer(size);
+
+ bb.put(ROLLBACK_RECORD);
+ bb.putLong(txID);
+ bb.put(DONE);
+ bb.rewind();
+
+ JournalFile usedFile;
+
+ usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+
+ transactionCallbacks.remove(txID);
+
+ tx.rollback(usedFile);
+ }
+
public synchronized long load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions) throws Exception
- {
+ {
if (state != STATE_STARTED)
{
throw new IllegalStateException("Journal must be in started state");
@@ -709,7 +709,7 @@
for (JournalFile file: orderedFiles)
{
file.getFile().open();
-
+
ByteBuffer bb = fileFactory.newBuffer(fileSize);
int bytesRead = file.getFile().read(bb);
@@ -732,7 +732,7 @@
int pos = bb.position();
byte recordType = bb.get();
-
+
switch(recordType)
{
case ADD_RECORD:
@@ -742,7 +742,7 @@
maxMessageID = Math.max(maxMessageID, id);
byte userRecordType = bb.get();
-
+
int size = bb.getInt();
byte[] record = new byte[size];
bb.get(record);
@@ -765,7 +765,7 @@
case UPDATE_RECORD:
{
long id = bb.getLong();
-
+
maxMessageID = Math.max(maxMessageID, id);
byte userRecordType = bb.get();
@@ -925,7 +925,7 @@
maxTransactionID = Math.max(maxTransactionID, txID);
long id = bb.getLong();
maxMessageID = Math.max(maxMessageID, id);
-
+
byte end = bb.get();
if (end != DONE)
@@ -963,7 +963,7 @@
case PREPARE_RECORD:
{
long txID = bb.getLong();
-
+
maxTransactionID = Math.max(maxTransactionID, txID);
byte end = bb.get();
@@ -1034,7 +1034,7 @@
case ROLLBACK_RECORD:
{
long txID = bb.getLong();
-
+
maxTransactionID = Math.max(maxTransactionID, txID);
byte end = bb.get();
@@ -1095,7 +1095,7 @@
}
file.getFile().close();
-
+
if (hasData)
{
dataFiles.add(file);
@@ -1192,22 +1192,22 @@
state = STATE_LOADED;
return maxMessageID;
+ }
+
+ public int getAlignment() throws Exception
+ {
+ return this.fileFactory.getAlignment();
}
-
- public int getAlignment() throws Exception
- {
- return this.fileFactory.getAlignment();
- }
-
- public synchronized void checkReclaimStatus() throws Exception
- {
- JournalFile[] files = new JournalFile[dataFiles.size()];
-
- reclaimer.scan(dataFiles.toArray(files));
- }
-
- public String debug() throws Exception
+
+ public synchronized void checkReclaimStatus() throws Exception
{
+ JournalFile[] files = new JournalFile[dataFiles.size()];
+
+ reclaimer.scan(dataFiles.toArray(files));
+ }
+
+ public String debug() throws Exception
+ {
this.checkReclaimStatus();
StringBuilder builder = new StringBuilder();
@@ -1235,12 +1235,12 @@
}
builder.append("#Opened Files:" + this.openedFiles.size());
-
+
return builder.toString();
}
// TestableJournal implementation --------------------------------------------------------------
-
+
/** Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
@@ -1254,7 +1254,7 @@
{
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
-
+
this.closingExecutor.execute(new Runnable()
{
public void run()
@@ -1265,12 +1265,12 @@
latch.await();
}
-
+
if (openExecutor != null && !openExecutor.isShutdown())
{
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = new CountDownLatch(1);
-
+
this.openExecutor.execute(new Runnable()
{
public void run()
@@ -1281,93 +1281,93 @@
latch.await();
}
+
+ }
- }
-
- public synchronized void checkAndReclaimFiles() throws Exception
- {
- checkReclaimStatus();
-
- for (JournalFile file: dataFiles)
- {
- if (file.isCanReclaim())
- {
- //File can be reclaimed or deleted
-
+ public synchronized void checkAndReclaimFiles() throws Exception
+ {
+ checkReclaimStatus();
+
+ for (JournalFile file: dataFiles)
+ {
+ if (file.isCanReclaim())
+ {
+ //File can be reclaimed or deleted
+
if (trace) log.trace("Reclaiming file " + file);
log.info("Reclaiming file " + file); // remove this
-
- dataFiles.remove(file);
-
- //FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
- {
- //Re-initialise it
-
- long newOrderingID = generateOrderingID();
-
- SequentialFile sf = file.getFile();
-
+
+ dataFiles.remove(file);
+
+ //FIXME - size() involves a scan!!!
+ if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ {
+ //Re-initialise it
+
+ long newOrderingID = generateOrderingID();
+
+ SequentialFile sf = file.getFile();
+
log.info("Adding " + sf + "to freeFiles"); // remove this
-
+
sf.open();
-
- ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
-
- bb.putLong(newOrderingID);
-
- //Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
-
- //TODO - if we can avoid this somehow would be good, since filling the file is a heavyweight
- //operation and can impact other IO operations on the disk
- sf.fill(0, fileSize, FILL_CHARACTER);
-
- int bytesWritten = sf.write(bb, true);
-
- JournalFile jf = new JournalFileImpl(sf, newOrderingID);
-
- sf.position(bytesWritten);
-
- jf.setOffset(bytesWritten);
-
- sf.close();
-
- freeFiles.add(jf);
- }
- else
- {
+
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
+
+ bb.putLong(newOrderingID);
+
+ //Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
+
+ //TODO - if we can avoid this somehow would be good, since filling the file is a heavyweight
+ //operation and can impact other IO operations on the disk
+ sf.fill(0, fileSize, FILL_CHARACTER);
+
+ int bytesWritten = sf.write(bb, true);
+
+ JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+
+ sf.position(bytesWritten);
+
+ jf.setOffset(bytesWritten);
+
+ sf.close();
+
+ freeFiles.add(jf);
+ }
+ else
+ {
log.info("Deleting " + file.getFile()); // remove this
-
- file.getFile().open();
-
- file.getFile().delete();
- }
- }
- }
- }
-
- public int getDataFilesCount()
- {
- return dataFiles.size();
- }
-
- public int getFreeFilesCount()
- {
- return freeFiles.size();
- }
-
- public int getOpenedFilesCount()
- {
- return openedFiles.size();
- }
-
- public int getIDMapSize()
- {
- return posFilesMap.size();
- }
-
- public int getFileSize()
+
+ file.getFile().open();
+
+ file.getFile().delete();
+ }
+ }
+ }
+ }
+
+ public int getDataFilesCount()
{
+ return dataFiles.size();
+ }
+
+ public int getFreeFilesCount()
+ {
+ return freeFiles.size();
+ }
+
+ public int getOpenedFilesCount()
+ {
+ return openedFiles.size();
+ }
+
+ public int getIDMapSize()
+ {
+ return posFilesMap.size();
+ }
+
+ public int getFileSize()
+ {
return fileSize;
}
@@ -1427,234 +1427,234 @@
debugWait();
}
-
- // MessagingComponent implementation ---------------------------------------------------
-
- public synchronized boolean isStarted()
- {
- return state != STATE_STOPPED;
- }
-
- public synchronized void start()
- {
- if (state != STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is not stopped");
- }
-
- this.openExecutor = Executors.newSingleThreadExecutor();
- this.closingExecutor = Executors.newSingleThreadExecutor();
-
- state = STATE_STARTED;
- }
-
- public synchronized void stop() throws Exception
- {
- if (state == STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is already stopped");
- }
-
- stopReclaimer();
-
- closingExecutor.shutdown();
- if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
- {
- throw new IllegalStateException("Time out waiting for closing executor to finish");
- }
-
- if (currentFile != null)
- {
- currentFile.getFile().close();
- }
-
- openExecutor.shutdown();
+
+ // MessagingComponent implementation ---------------------------------------------------
+
+ public synchronized boolean isStarted()
+ {
+ return state != STATE_STOPPED;
+ }
+
+ public synchronized void start()
+ {
+ if (state != STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is not stopped");
+ }
+
+ this.openExecutor = Executors.newSingleThreadExecutor();
+ this.closingExecutor = Executors.newSingleThreadExecutor();
+
+ state = STATE_STARTED;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (state == STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is already stopped");
+ }
+
+ stopReclaimer();
+
+ closingExecutor.shutdown();
+ if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
+ {
+ throw new IllegalStateException("Time out waiting for closing executor to finish");
+ }
+
+ if (currentFile != null)
+ {
+ currentFile.getFile().close();
+ }
+
+ openExecutor.shutdown();
if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
{
throw new IllegalStateException("Time out waiting for open executor to finish");
}
- for (JournalFile file: openedFiles)
- {
- file.getFile().close();
- }
-
- currentFile = null;
-
- dataFiles.clear();
-
- freeFiles.clear();
-
- openedFiles.clear();
-
- state = STATE_STOPPED;
- }
-
- public void startReclaimer()
- {
- if (state == STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is stopped");
- }
-
- reclaimerTask = new ReclaimerTask();
-
- timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
- }
-
- public void stopReclaimer()
- {
- if (state == STATE_STOPPED)
- {
- throw new IllegalStateException("Journal is already stopped");
- }
-
- if (reclaimerTask != null)
- {
- reclaimerTask.cancel();
- }
- }
-
- // Public -----------------------------------------------------------------------------
-
- // Private -----------------------------------------------------------------------------
-
- private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
- {
- lock.acquire();
-
- int size = bb.capacity();
-
- try
- {
- checkFile(size);
- if (callback != null)
- {
- currentFile.getFile().write(bb, callback);
- if (sync)
- {
- callback.waitCompletion(aioTimeout);
- }
- }
- else
- {
- currentFile.getFile().write(bb, sync);
- }
- currentFile.extendOffset(size);
- return currentFile;
- }
- finally
- {
- lock.release();
- }
- }
-
- private void repairFrom(final int pos, final JournalFile file) throws Exception
- {
- log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
- " in the record that starts at position " + pos + ". " +
- "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-
- file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
-
- file.getFile().position(pos);
- }
-
- private JournalFile createFile(boolean keepOpened) throws Exception
- {
- long orderingID = generateOrderingID();
-
- String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
-
- if (trace) log.trace("Creating file " + fileName);
-
- SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
-
- sequentialFile.open();
-
- sequentialFile.fill(0, fileSize, FILL_CHARACTER);
-
- ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
-
- bb.putLong(orderingID);
-
- bb.rewind();
-
- int bytesWritten = sequentialFile.write(bb, true);
-
- JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
-
- info.extendOffset(bytesWritten);
-
- if (!keepOpened)
+ for (JournalFile file: openedFiles)
{
+ file.getFile().close();
+ }
+
+ currentFile = null;
+
+ dataFiles.clear();
+
+ freeFiles.clear();
+
+ openedFiles.clear();
+
+ state = STATE_STOPPED;
+ }
+
+ public void startReclaimer()
+ {
+ if (state == STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is stopped");
+ }
+
+ reclaimerTask = new ReclaimerTask();
+
+ timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
+ }
+
+ public void stopReclaimer()
+ {
+ if (state == STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is already stopped");
+ }
+
+ if (reclaimerTask != null)
+ {
+ reclaimerTask.cancel();
+ }
+ }
+
+ // Public -----------------------------------------------------------------------------
+
+ // Private -----------------------------------------------------------------------------
+
+ private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
+ {
+ lock.acquire();
+
+ int size = bb.capacity();
+
+ try
+ {
+ checkFile(size);
+ if (callback != null)
+ {
+ currentFile.getFile().write(bb, callback);
+ if (sync)
+ {
+ callback.waitCompletion(aioTimeout);
+ }
+ }
+ else
+ {
+ currentFile.getFile().write(bb, sync);
+ }
+ currentFile.extendOffset(size);
+ return currentFile;
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ private void repairFrom(final int pos, final JournalFile file) throws Exception
+ {
+ log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+ " in the record that starts at position " + pos + ". " +
+ "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
+
+ file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
+
+ file.getFile().position(pos);
+ }
+
+ private JournalFile createFile(boolean keepOpened) throws Exception
+ {
+ long orderingID = generateOrderingID();
+
+ String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+
+ if (trace) log.trace("Creating file " + fileName);
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+
+ sequentialFile.open();
+
+ sequentialFile.fill(0, fileSize, FILL_CHARACTER);
+
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
+
+ bb.putLong(orderingID);
+
+ bb.rewind();
+
+ int bytesWritten = sequentialFile.write(bb, true);
+
+ JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+
+ info.extendOffset(bytesWritten);
+
+ if (!keepOpened)
+ {
sequentialFile.close();
}
-
- return info;
- }
-
- private void openFile(JournalFile file) throws Exception
- {
- file.getFile().open();
- file.getFile().position(file.getFile().calculateBlockStart(SIZE_LONG));
- }
-
- private long generateOrderingID()
- {
- long orderingID = System.currentTimeMillis();
-
- while (orderingID == lastOrderingID)
- {
- //Ensure it's unique
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- orderingID = System.currentTimeMillis();
- }
- lastOrderingID = orderingID;
-
- return orderingID;
- }
-
- // You need to guarantee lock.acquire() over currentFile before calling this method
- private void checkFile(final int size) throws Exception
- {
- if (size % currentFile.getFile().getAlignment() != 0)
- {
- throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
- }
-
- //We take into account the first timestamp long
- if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
- {
- throw new IllegalArgumentException("Record is too large to store " + size);
- }
-
- if (currentFile == null || fileSize - currentFile.getOffset() < size)
- {
- moveNextFile();
-
- }
- }
-
- // You need to guarantee lock.acquire() before calling this method
+
+ return info;
+ }
+
+ private void openFile(JournalFile file) throws Exception
+ {
+ file.getFile().open();
+ file.getFile().position(file.getFile().calculateBlockStart(SIZE_LONG));
+ }
+
+ private long generateOrderingID()
+ {
+ long orderingID = System.currentTimeMillis();
+
+ while (orderingID == lastOrderingID)
+ {
+ //Ensure it's unique
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ orderingID = System.currentTimeMillis();
+ }
+ lastOrderingID = orderingID;
+
+ return orderingID;
+ }
+
+ // You need to guarantee lock.acquire() over currentFile before calling this method
+ private void checkFile(final int size) throws Exception
+ {
+ if (size % currentFile.getFile().getAlignment() != 0)
+ {
+ throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
+ }
+
+ //We take into account the first timestamp long
+ if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
+ {
+ throw new IllegalArgumentException("Record is too large to store " + size);
+ }
+
+ if (currentFile == null || fileSize - currentFile.getOffset() < size)
+ {
+ moveNextFile();
+
+ }
+ }
+
+ // You need to guarantee lock.acquire() before calling this method
private void moveNextFile() throws InterruptedException
{
closeFile(currentFile);
-
+
currentFile = enqueueOpenFile();
}
-
+
// You need to guarantee lock.acquire() before calling this method
- private JournalFile enqueueOpenFile() throws InterruptedException
- {
- if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
- openExecutor.execute(new Runnable()
+ private JournalFile enqueueOpenFile() throws InterruptedException
+ {
+ if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+ openExecutor.execute(new Runnable()
{
public void run()
{
@@ -1668,18 +1668,18 @@
}
}
});
-
- JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
-
- if (nextFile == null)
- {
+
+ JournalFile nextFile = openedFiles.poll(aioTimeout, TimeUnit.SECONDS);
+
+ if (nextFile == null)
+ {
throw new IllegalStateException("Timed out waiting for an opened file");
- }
-
- return nextFile;
- }
-
-
+ }
+
+ return nextFile;
+ }
+
+
/**
*
* Open a file and place it into the openedFiles queue
@@ -1694,7 +1694,7 @@
catch (NoSuchElementException ignored)
{
}
-
+
if (nextOpenedFile == null)
{
nextOpenedFile = createFile(true);
@@ -1703,42 +1703,42 @@
{
openFile(nextOpenedFile);
}
-
+
openedFiles.offer(nextOpenedFile);
}
-
- private void closeFile(final JournalFile file)
- {
- this.closingExecutor.execute(new Runnable() { public void run()
- {
- try
- {
- file.getFile().close();
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- }
- dataFiles.add(file);
- }
- });
- }
-
- private TransactionNegPos getTransactionInfo(final long txID)
- {
- TransactionNegPos tx = transactionInfos.get(txID);
-
- if (tx == null)
- {
- tx = new TransactionNegPos();
-
- transactionInfos.put(txID, tx);
- }
-
- return tx;
- }
-
+
+ private void closeFile(final JournalFile file)
+ {
+ this.closingExecutor.execute(new Runnable() { public void run()
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ dataFiles.add(file);
+ }
+ });
+ }
+
+ private TransactionNegPos getTransactionInfo(final long txID)
+ {
+ TransactionNegPos tx = transactionInfos.get(txID);
+
+ if (tx == null)
+ {
+ tx = new TransactionNegPos();
+
+ transactionInfos.put(txID, tx);
+ }
+
+ return tx;
+ }
+
private TransactionCallback getTransactionCallback(final long transactionId)
{
if (fileFactory.isSupportsCallbacks() && syncTransactional)
@@ -1750,7 +1750,7 @@
callback = new TransactionCallback();
transactionCallbacks.put(transactionId, callback);
}
-
+
callback.countUp();
return callback;
}
@@ -1760,8 +1760,8 @@
}
}
- // Inner classes ---------------------------------------------------------------------------
-
+ // Inner classes ---------------------------------------------------------------------------
+
private static class TransactionCallback implements IOCallback
{
private final VariableLatch countLatch = new VariableLatch();
@@ -1774,7 +1774,7 @@
{
countLatch.up();
}
-
+
public void done()
{
countLatch.down();
@@ -1789,7 +1789,7 @@
throw new IllegalStateException("Error on Transaction: " + errorCode + " - " + errorMessage);
}
}
-
+
public void onError(final int errorCode, final String errorMessage)
{
this.errorMessage = errorMessage;
@@ -1798,193 +1798,193 @@
}
}
-
- private class ReclaimerTask extends TimerTask
- {
- public synchronized boolean cancel()
- {
- timer.cancel();
-
- return super.cancel();
- }
-
- public synchronized void run()
- {
- try
- {
- checkAndReclaimFiles();
- }
- catch (Exception e)
- {
- log.error("Failure in running ReclaimerTask", e);
-
- cancel();
- }
- }
- }
-
- private static class PosFiles
- {
- private final JournalFile addFile;
-
- private List<JournalFile> updateFiles;
-
- PosFiles(final JournalFile addFile)
- {
- this.addFile = addFile;
-
- addFile.incPosCount();
- }
-
- void addUpdateFile(final JournalFile updateFile)
- {
- if (updateFiles == null)
- {
- updateFiles = new ArrayList<JournalFile>();
- }
-
- updateFiles.add(updateFile);
-
- updateFile.incPosCount();
- }
-
- void addDelete(final JournalFile file)
- {
- file.incNegCount(addFile);
-
- if (updateFiles != null)
- {
- for (JournalFile jf: updateFiles)
- {
- file.incNegCount(jf);
- }
- }
- }
- }
-
- private class TransactionNegPos
- {
- private List<Pair<JournalFile, Long>> pos;
-
- private List<Pair<JournalFile, Long>> neg;
-
- private Set<JournalFile> transactionPos;
-
- void addTXPosCount(final JournalFile file)
- {
- if (transactionPos == null)
- {
- transactionPos = new HashSet<JournalFile>();
- }
-
- if (!transactionPos.contains(file))
- {
- transactionPos.add(file);
-
- //We add a pos for the transaction itself in the file - this prevents any transactional operations
- //being deleted before a commit or rollback is written
- file.incPosCount();
- }
- }
-
- void addPos(final JournalFile file, final long id)
- {
- addTXPosCount(file);
-
- if (pos == null)
- {
- pos = new ArrayList<Pair<JournalFile, Long>>();
- }
-
- pos.add(new Pair<JournalFile, Long>(file, id));
- }
-
- void addNeg(final JournalFile file, final long id)
- {
- addTXPosCount(file);
-
- if (neg == null)
- {
- neg = new ArrayList<Pair<JournalFile, Long>>();
- }
-
- neg.add(new Pair<JournalFile, Long>(file, id));
- }
-
- void commit(final JournalFile file)
- {
- if (pos != null)
- {
- for (Pair<JournalFile, Long> p: pos)
- {
- PosFiles posFiles = posFilesMap.get(p.b);
-
- if (posFiles == null)
- {
- posFiles = new PosFiles(p.a);
-
- posFilesMap.put(p.b, posFiles);
- }
- else
- {
- posFiles.addUpdateFile(p.a);
- }
- }
- }
-
- if (neg != null)
- {
- for (Pair<JournalFile, Long> n: neg)
- {
- PosFiles posFiles = posFilesMap.remove(n.b);
-
- if (posFiles != null)
- {
- //throw new IllegalStateException("Cannot find add info " + n.b);
- posFiles.addDelete(n.a);
- }
-
- }
- }
-
- //Now add negs for the pos we added in each file in which there were transactional operations
-
- for (JournalFile jf: transactionPos)
- {
- file.incNegCount(jf);
- }
- }
-
- void rollback(JournalFile file)
- {
- //Now add negs for the pos we added in each file in which there were transactional operations
- //Note that we do this on rollback as we do on commit, since we need to ensure the file containing
- //the rollback record doesn't get deleted before the files with the transactional operations are deleted
- //Otherwise we may run into problems especially with XA where we are just left with a prepare when the tx
- //has actually been rolled back
-
- for (JournalFile jf: transactionPos)
- {
- file.incNegCount(jf);
- }
- }
-
- void prepare(JournalFile file)
- {
- //We don't want the prepare record getting deleted before time
-
- addTXPosCount(file);
- }
-
- void forget()
- {
- //The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
-
- for (JournalFile jf: transactionPos)
- {
- jf.decPosCount();
- }
- }
- }
-
+
+ private class ReclaimerTask extends TimerTask
+ {
+ public synchronized boolean cancel()
+ {
+ timer.cancel();
+
+ return super.cancel();
+ }
+
+ public synchronized void run()
+ {
+ try
+ {
+ checkAndReclaimFiles();
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in running ReclaimerTask", e);
+
+ cancel();
+ }
+ }
+ }
+
+ private static class PosFiles
+ {
+ private final JournalFile addFile;
+
+ private List<JournalFile> updateFiles;
+
+ PosFiles(final JournalFile addFile)
+ {
+ this.addFile = addFile;
+
+ addFile.incPosCount();
+ }
+
+ void addUpdateFile(final JournalFile updateFile)
+ {
+ if (updateFiles == null)
+ {
+ updateFiles = new ArrayList<JournalFile>();
+ }
+
+ updateFiles.add(updateFile);
+
+ updateFile.incPosCount();
+ }
+
+ void addDelete(final JournalFile file)
+ {
+ file.incNegCount(addFile);
+
+ if (updateFiles != null)
+ {
+ for (JournalFile jf: updateFiles)
+ {
+ file.incNegCount(jf);
+ }
+ }
+ }
+ }
+
+ private class TransactionNegPos
+ {
+ private List<Pair<JournalFile, Long>> pos;
+
+ private List<Pair<JournalFile, Long>> neg;
+
+ private Set<JournalFile> transactionPos;
+
+ void addTXPosCount(final JournalFile file)
+ {
+ if (transactionPos == null)
+ {
+ transactionPos = new HashSet<JournalFile>();
+ }
+
+ if (!transactionPos.contains(file))
+ {
+ transactionPos.add(file);
+
+ //We add a pos for the transaction itself in the file - this prevents any transactional operations
+ //being deleted before a commit or rollback is written
+ file.incPosCount();
+ }
+ }
+
+ void addPos(final JournalFile file, final long id)
+ {
+ addTXPosCount(file);
+
+ if (pos == null)
+ {
+ pos = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ pos.add(new Pair<JournalFile, Long>(file, id));
+ }
+
+ void addNeg(final JournalFile file, final long id)
+ {
+ addTXPosCount(file);
+
+ if (neg == null)
+ {
+ neg = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ neg.add(new Pair<JournalFile, Long>(file, id));
+ }
+
+ void commit(final JournalFile file)
+ {
+ if (pos != null)
+ {
+ for (Pair<JournalFile, Long> p: pos)
+ {
+ PosFiles posFiles = posFilesMap.get(p.b);
+
+ if (posFiles == null)
+ {
+ posFiles = new PosFiles(p.a);
+
+ posFilesMap.put(p.b, posFiles);
+ }
+ else
+ {
+ posFiles.addUpdateFile(p.a);
+ }
+ }
+ }
+
+ if (neg != null)
+ {
+ for (Pair<JournalFile, Long> n: neg)
+ {
+ PosFiles posFiles = posFilesMap.remove(n.b);
+
+ if (posFiles != null)
+ {
+ //throw new IllegalStateException("Cannot find add info " + n.b);
+ posFiles.addDelete(n.a);
+ }
+
+ }
+ }
+
+ //Now add negs for the pos we added in each file in which there were transactional operations
+
+ for (JournalFile jf: transactionPos)
+ {
+ file.incNegCount(jf);
+ }
+ }
+
+ void rollback(JournalFile file)
+ {
+ //Now add negs for the pos we added in each file in which there were transactional operations
+ //Note that we do this on rollback as we do on commit, since we need to ensure the file containing
+ //the rollback record doesn't get deleted before the files with the transactional operations are deleted
+ //Otherwise we may run into problems especially with XA where we are just left with a prepare when the tx
+ //has actually been rolled back
+
+ for (JournalFile jf: transactionPos)
+ {
+ file.incNegCount(jf);
+ }
+ }
+
+ void prepare(JournalFile file)
+ {
+ //We don't want the prepare record getting deleted before time
+
+ addTXPosCount(file);
+ }
+
+ void forget()
+ {
+ //The transaction was not committed or rolled back in the file, so we reverse any pos counts we added
+
+ for (JournalFile jf: transactionPos)
+ {
+ jf.decPosCount();
+ }
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list