[jboss-cvs] JBoss Messaging SVN: r4756 - trunk/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 31 05:41:34 EDT 2008
Author: timfox
Date: 2008-07-31 05:41:33 -0400 (Thu, 31 Jul 2008)
New Revision: 4756
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
Mainly cosmetics and adding final modifier
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-31 09:30:40 UTC (rev 4755)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-31 09:41:33 UTC (rev 4756)
@@ -91,8 +91,7 @@
private static final int SIZE_BYTE = 1;
public static final int MIN_FILE_SIZE = 1024;
-
-
+
public static final int SIZE_HEADER = 4;
//Record markers - they must be all unique
@@ -191,7 +190,7 @@
private final AtomicLong transactionIDSequence = new AtomicLong(0);
- private Reclaimer reclaimer = new Reclaimer();
+ private final Reclaimer reclaimer = new Reclaimer();
// Static --------------------------------------------------------
@@ -202,16 +201,16 @@
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging Journal
private static final void trace(String message)
- {
+ {
log.trace(message);
}
// Constructors --------------------------------------------------
public JournalImpl(final int fileSize, final int minFiles,
- final boolean syncTransactional, final boolean syncNonTransactional,
- final SequentialFileFactory fileFactory,
- final String filePrefix, final String fileExtension, final int maxAIO)
+ final boolean syncTransactional, final boolean syncNonTransactional,
+ final SequentialFileFactory fileFactory,
+ final String filePrefix, final String fileExtension, final int maxAIO)
{
if (fileSize < MIN_FILE_SIZE)
{
@@ -303,8 +302,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
-
+
int size = SIZE_ADD_RECORD + record.length;
ByteBuffer bb = fileFactory.newBuffer(size);
@@ -317,8 +315,7 @@
bb.put(record);
bb.putInt(size);
bb.rewind();
-
-
+
try
{
lock.acquire();
@@ -361,10 +358,9 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
posFiles.addUpdateFile(usedFile);
@@ -402,10 +398,10 @@
bb.putInt(size);
bb.rewind();
- try
- {
- lock.acquire();
+ lock.acquire();
+ try
+ {
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
posFiles.addUpdateFile(usedFile);
@@ -416,7 +412,7 @@
}
}
- public void appendDeleteRecord(long id) throws Exception
+ public void appendDeleteRecord(final long id) throws Exception
{
if (state != STATE_LOADED)
{
@@ -440,10 +436,10 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
posFiles.addDelete(usedFile);
@@ -483,16 +479,15 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
- tx.addPositive(usedFile, id);
-
+ tx.addPositive(usedFile, id);
}
finally
{
@@ -521,16 +516,15 @@
bb.putInt(size);
bb.rewind();
- try
- {
- lock.acquire();
+ lock.acquire();
+ try
+ {
JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
tx.addPositive(usedFile, id);
-
}
finally
{
@@ -559,16 +553,15 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
tx.addPositive(usedFile, id);
-
}
finally
{
@@ -586,8 +579,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
-
-
+
bb.putByte(UPDATE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
@@ -598,16 +590,15 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb.getBuffer(), false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
tx.addPositive(usedFile, id);
-
}
finally
{
@@ -633,16 +624,15 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
JournalTransaction tx = getTransactionInfo(txID);
tx.addNegative(usedFile, id);
-
}
finally
{
@@ -666,15 +656,13 @@
ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx);
+ lock.acquire();
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
tx.prepare(usedFile);
-
}
finally
{
@@ -698,26 +686,20 @@
ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx);
+ lock.acquire();
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
transactionCallbacks.remove(txID);
tx.commit(usedFile);
-
}
finally
{
lock.release();
}
-
-
-
-
}
public void appendRollbackRecord(final long txID) throws Exception
@@ -744,16 +726,15 @@
bb.putInt(size);
bb.rewind();
+ lock.acquire();
+
try
- {
- lock.acquire();
-
+ {
JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
transactionCallbacks.remove(txID);
tx.rollback(usedFile);
-
}
finally
{
@@ -762,18 +743,14 @@
}
public synchronized long load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions) throws Exception
- {
-
+ final List<PreparedTransactionInfo> preparedTransactions) throws Exception
+ {
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
-
- long maxID = load (new LoadManager()
+ long maxID = load (new LoadManager()
{
-
- public void addPreparedTransaction(
- PreparedTransactionInfo preparedTransaction)
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
}
@@ -791,11 +768,9 @@
public void deleteRecord(long id)
{
recordsToDelete.add(id);
- }
-
+ }
});
-
-
+
for (RecordInfo record: records)
{
if (!recordsToDelete.contains(record.id))
@@ -807,9 +782,8 @@
return maxID;
}
- public synchronized long load (LoadManager loadManager) throws Exception
- {
-
+ public synchronized long load(final LoadManager loadManager) throws Exception
+ {
if (state != STATE_STARTED)
{
throw new IllegalStateException("Journal must be in started state");
@@ -835,10 +809,8 @@
if (bytesRead != fileSize)
{
- //deal with this better
-
throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " + fileSize + " : " + file.getFile().getFileName());
+ " expected " + fileSize + " : " + file.getFile().getFileName());
}
//First long is the ordering timestamp, we just jump its position
@@ -1058,8 +1030,7 @@
Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
if (tx != null)
- {
-
+ {
tx.prepared = true;
JournalTransaction journalTransaction = transactionInfos.get(transactionID);
@@ -1068,11 +1039,9 @@
{
throw new IllegalStateException("Cannot find tx " + transactionID);
}
+
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
-
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
-
if (healthy)
{
journalTransaction.prepare(file);
@@ -1098,8 +1067,7 @@
Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
if (tx != null)
- {
-
+ {
JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
if (journalTransaction == null)
@@ -1107,10 +1075,8 @@
throw new IllegalStateException("Cannot find tx " + transactionID);
}
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
-
-
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+
if (healthy)
{
for (RecordInfo txRecord: tx.recordInfos)
@@ -1165,7 +1131,7 @@
default:
{
throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " + recordType);
+ " is corrupt, invalid record type " + recordType);
}
}
@@ -1467,7 +1433,6 @@
this.autoReclaim = false;
}
-
// MessagingComponent implementation ---------------------------------------------------
public synchronized boolean isStarted()
@@ -1485,7 +1450,6 @@
this.filesExecutor = Executors.newSingleThreadExecutor();
state = STATE_STARTED;
-
}
public synchronized void stop() throws Exception
@@ -1527,7 +1491,7 @@
// Private -----------------------------------------------------------------------------
// Discard the old JournalFile and set it with a new ID
- private JournalFile reinitializeFile(JournalFile file) throws Exception
+ private JournalFile reinitializeFile(final JournalFile file) throws Exception
{
int newOrderingID = generateOrderingID();
@@ -1548,35 +1512,42 @@
jf.setOffset(bytesWritten);
sf.close();
+
return jf;
}
@SuppressWarnings("unchecked")
- private Pair<Integer, Integer>[] readReferencesOnTransaction(int variableSize, ByteBuffer bb)
+ private Pair<Integer, Integer>[] readReferencesOnTransaction(final int variableSize, final ByteBuffer bb)
{
int numberOfFiles = variableSize / (SIZE_INT * 2);
+
Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
+
for (int i = 0; i < numberOfFiles; i++)
{
values[i] = new Pair(bb.getInt(), bb.getInt());
}
+
return values;
}
- private boolean checkTransactionHealth(
- JournalTransaction journalTransaction, List<JournalFile> orderedFiles,
- Pair<Integer, Integer>[] readReferences)
+ private boolean checkTransactionHealth(final JournalTransaction journalTransaction,
+ final List<JournalFile> orderedFiles,
+ final Pair<Integer, Integer>[] readReferences)
{
boolean healthy = true;
+
Map<Integer, AtomicInteger> refMap = journalTransaction.getElementsSummary();
for (Pair<Integer, Integer> ref: readReferences)
{
AtomicInteger counter = refMap.get(ref.a);
+
if (counter == null)
{
// Couldn't find the counter, but if part of the transaction was reclaimed it is ok!
boolean found = false;
+
for (JournalFile lookupFile: orderedFiles)
{
if (lookupFile.getOrderingID() == ref.a)
@@ -1643,7 +1614,7 @@
return recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX;
}
- private int getRecordSize(byte recordType)
+ private int getRecordSize(final byte recordType)
{
// The record size (without the variable portion)
int recordSize = 0;
@@ -1733,8 +1704,7 @@
* You need to call lock.acquire before calling this method
* */
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
- {
-
+ {
int size = bb.capacity();
checkFile(size);
bb.position(SIZE_BYTE);
@@ -1760,7 +1730,7 @@
return currentFile;
}
- private JournalFile createFile(boolean keepOpened) throws Exception
+ private JournalFile createFile(final boolean keepOpened) throws Exception
{
int orderingID = generateOrderingID();
@@ -1794,7 +1764,7 @@
return info;
}
- private void openFile(JournalFile file) throws Exception
+ private void openFile(final JournalFile file) throws Exception
{
file.getFile().open();
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
@@ -1838,6 +1808,7 @@
private JournalFile enqueueOpenFile() throws InterruptedException
{
if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -1852,6 +1823,7 @@
}
}
});
+
if (autoReclaim)
{
filesExecutor.execute(new Runnable()
@@ -1880,12 +1852,10 @@
log.warn("Couldn't open a file in 60 Seconds", new Exception ("Warning: Couldn't open a file in 60 Seconds"));
}
}
-
-
+
return nextFile;
}
-
-
+
/**
*
* Open a file and place it into the openedFiles queue
@@ -1912,8 +1882,7 @@
openedFiles.offer(nextOpenedFile);
}
-
-
+
private void closeFile(final JournalFile file)
{
this.filesExecutor.execute(new Runnable() { public void run()
@@ -2072,7 +2041,6 @@
{
return this.invalid;
}
-
public Map<Integer, AtomicInteger> getElementsSummary()
{
@@ -2151,7 +2119,7 @@
}
}
- public void rollback(JournalFile file)
+ public void rollback(final JournalFile file)
{
//Now add negs for the pos we added in each file in which there were transactional operations
//Note that we do this on rollback as we do on commit, since we need to ensure the file containing
@@ -2165,7 +2133,7 @@
}
}
- public void prepare(JournalFile file)
+ public void prepare(final JournalFile file)
{
//We don't want the prepare record getting deleted before time
@@ -2199,20 +2167,18 @@
}
}
- private AtomicInteger getCounter(JournalFile file)
+ private AtomicInteger getCounter(final JournalFile file)
{
AtomicInteger value = numberOfElements.get(file.getOrderingID());
if (value == null)
{
value = new AtomicInteger();
- numberOfElements.put(file.getOrderingID(), value);
-
+ numberOfElements.put(file.getOrderingID(), value);
}
return value;
}
}
-
}
More information about the jboss-cvs-commits
mailing list