[jboss-cvs] JBoss Messaging SVN: r4925 - in trunk/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 10 05:29:43 EDT 2008
Author: timfox
Date: 2008-09-10 05:29:43 -0400 (Wed, 10 Sep 2008)
New Revision: 4925
Modified:
trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
Log:
Some tweaks to the journal
Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-10 09:29:43 UTC (rev 4925)
@@ -38,15 +38,17 @@
public class PreparedTransactionInfo
{
public final long id;
- public final byte[] xidData;
+ public final byte[] extraData;
+
public final List<RecordInfo> records = new ArrayList<RecordInfo>();
public final Set<Long> recordsToDelete = new HashSet<Long>();
- public PreparedTransactionInfo(final long id, final byte[] xidData)
+ public PreparedTransactionInfo(final long id, final byte[] extraData)
{
this.id = id;
- this.xidData = xidData;
+
+ this.extraData = extraData;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-09-10 09:29:43 UTC (rev 4925)
@@ -75,6 +75,8 @@
// These add methods are only used by testCases
+ //FIXME: These methods should be removed - they are only used by tests
+
void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-10 09:29:43 UTC (rev 4925)
@@ -71,6 +71,7 @@
* <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
@@ -84,6 +85,19 @@
private static final int STATE_LOADED = 2;
+ // Static --------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging Journal
+ private static final void trace(String message)
+ {
+ log.trace(message);
+ }
+
// The sizes of primitive types
private static final int SIZE_LONG = 8;
@@ -139,11 +153,10 @@
public static final byte ROLLBACK_RECORD = 19;
public static final byte FILL_CHARACTER = 74; // Letter 'J'
-
-
+
// Attributes ----------------------------------------------------
- private boolean autoReclaim = true;
+ private volatile boolean autoReclaim = true;
private final AtomicInteger nextOrderingId = new AtomicInteger(0);
@@ -183,11 +196,6 @@
/** Object that will control buffer's callback and getting buffers from the queue */
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
- /*
- * We use a semaphore rather than synchronized since it performs better when
- * contended
- */
-
//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
private final Semaphore lock = new Semaphore(1, true);
@@ -198,20 +206,7 @@
private final AtomicLong transactionIDSequence = new AtomicLong(0);
private final Reclaimer reclaimer = new Reclaimer();
-
- // Static --------------------------------------------------------
-
- private static final Logger log = Logger.getLogger(JournalImpl.class);
-
- private static final boolean trace = log.isTraceEnabled();
-
- // This method exists just to make debug easier.
- // I could replace log.trace by log.info temporarily while I was debugging Journal
- private static final void trace(String message)
- {
- log.trace(message);
- }
-
+
// Constructors --------------------------------------------------
public JournalImpl(final int fileSize, final int minFiles,
@@ -285,13 +280,14 @@
ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(ADD_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(id);
bb.putInt(recordLength);
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); // TODO is rewind necessary?
try
{
@@ -326,13 +322,14 @@
ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(UPDATE_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ // bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(id);
bb.putInt(record.getEncodeSize());
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
lock.acquire();
@@ -367,10 +364,11 @@
ByteBuffer bb = newBuffer(size);
bb.put(DELETE_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(id);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
lock.acquire();
@@ -406,14 +404,15 @@
ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(ADD_RECORD_TX);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ // bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(txID);
bb.putLong(id);
bb.putInt(recordLength);
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
lock.acquire();
@@ -431,7 +430,7 @@
}
}
- public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
+ public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -443,14 +442,15 @@
ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(UPDATE_RECORD_TX);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(txID);
bb.putLong(id);
bb.putInt(record.getEncodeSize());
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
lock.acquire();
@@ -480,11 +480,12 @@
ByteBuffer bb = newBuffer(size);
bb.put(DELETE_RECORD_TX);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1);
bb.putLong(txID);
bb.putLong(id);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary
lock.acquire();
@@ -507,13 +508,13 @@
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
* back to a state it could be committed. </p>
*
- * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+ * <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
*
* @param txID
- * @param transactionData Information to support the system replaying the transaction
+ * @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, EncodingSupport transactionData) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData) throws Exception
{
if (state != STATE_LOADED)
{
@@ -589,10 +590,11 @@
ByteBuffer bb = newBuffer(size);
bb.put(ROLLBACK_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(txID);
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); // Is this necessary?
lock.acquire();
@@ -649,7 +651,7 @@
return maxID;
}
-
+
public synchronized long load(final LoadManager loadManager) throws Exception
{
if (state != STATE_STARTED)
@@ -677,6 +679,8 @@
if (bytesRead != fileSize)
{
+ //FIXME - shouldn't be just ignore the file and log a warning, rather than throw ISE?
+ //We don't want to leave the user with an unusable system
throw new IllegalStateException("File is wrong size " + bytesRead +
" expected " + fileSize + " : " + file.getFile().getFileName());
}
@@ -691,6 +695,7 @@
final int pos = bb.position();
byte recordType = bb.get();
+
if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
// I - We scan for any valid record on the file. If a hole happened on the middle of the file we keep looking until all the possibilities are gone
@@ -726,30 +731,37 @@
{
continue;
}
+
transactionID = bb.getLong();
+
maxTransactionID = Math.max(maxTransactionID, transactionID);
}
long recordID = 0;
+
if (!isCompleteTransaction(recordType))
{
if (bb.position() + SIZE_LONG > fileSize)
{
continue;
}
+
recordID = bb.getLong();
+
maxID = Math.max(maxID, recordID);
}
-
// We use the size of the record to validate the health of the record.
// (V) We verify the size of the record
// The variable record portion used on Updates and Appends
int variableSize = 0;
- // Used to hold XIDs on PrepareTransactions
- int preparedTransactionDataSize = 0;
+
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
+
byte userRecordType = 0;
+
byte record[] = null;
if (isContainsBody(recordType))
@@ -763,21 +775,24 @@
if (bb.position() + variableSize > fileSize)
{
+ //TODO - isn't this an error?
continue;
}
userRecordType = bb.get();
record = new byte[variableSize];
+
bb.get(record);
}
-
+
if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
{
- if(recordType == PREPARE_RECORD)
+ if (recordType == PREPARE_RECORD)
{
- preparedTransactionDataSize = bb.getInt();
- }
+ preparedTransactionExtraDataSize = bb.getInt();
+ }
+ //Comment required: I'm guessing this is because commits and prepares also include the record ids?
variableSize += bb.getInt() * SIZE_INT * 2;
}
@@ -785,38 +800,45 @@
// VI - this is completing V, We will validate the size at the end of the record,
// But we avoid buffer overflows by damaged data
- if (pos + recordSize + variableSize + preparedTransactionDataSize > fileSize)
+ if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
{
// Avoid a buffer overflow caused by damaged data... continue scanning for more records...
+
+ //TODO - isn't this an error?
+
continue;
}
int oldPos = bb.position();
- bb.position(pos + variableSize + recordSize + preparedTransactionDataSize - SIZE_INT);
+ bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
int checkSize = bb.getInt();
// VII - The checkSize at the end has to match with the size informed at the beggining.
// This is like testing a hash for the record. (We could replace the checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+
// If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+
hasData = true;
+
bb.position(pos + SIZE_BYTE);
+
continue;
}
bb.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load the data now.
-
- // At this point everything is already check. So relax and just load the data now.
-
- switch(recordType)
+ switch (recordType)
{
case ADD_RECORD:
{
+
loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
posFilesMap.put(recordID, new PosFiles(file));
@@ -828,7 +850,9 @@
case UPDATE_RECORD:
{
loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+
hasData = true;
+
file.incPosCount();
PosFiles posFiles = posFilesMap.get(recordID);
@@ -846,6 +870,7 @@
case DELETE_RECORD:
{
loadManager.deleteRecord(recordID);
+
hasData = true;
PosFiles posFiles = posFilesMap.remove(recordID);
@@ -864,11 +889,12 @@
if (tx == null)
{
- tx = new TransactionHolder(transactionID);
+ tx = new TransactionHolder(transactionID);
+
transactions.put(transactionID, tx);
}
- tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
+ tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
JournalTransaction tnp = transactionInfos.get(transactionID);
@@ -891,7 +917,8 @@
if (tx == null)
{
- tx = new TransactionHolder(transactionID);
+ tx = new TransactionHolder(transactionID);
+
transactions.put(transactionID, tx);
}
@@ -915,52 +942,50 @@
case PREPARE_RECORD:
{
TransactionHolder tx = transactions.get(transactionID);
-
-
+
if (tx == null)
{
- tx = new TransactionHolder(transactionID);
+ tx = new TransactionHolder(transactionID);
+
transactions.put(transactionID, tx);
}
-
-
+
// We need to read it even if transaction was not found, or the reading checks would fail
- byte xidData[] = new byte[preparedTransactionDataSize];
- bb.get(xidData);
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ bb.get(extraData);
// Pair <FileID, NumberOfElements>
Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
-
- if (tx != null)
- {
- tx.prepared = true;
- tx.xidData = xidData;
- JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+
+ tx.prepared = true;
+
+ tx.extraData = extraData;
+
+ JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+
+ if (journalTransaction == null)
+ {
+ journalTransaction = new JournalTransaction();
-
- if (journalTransaction == null)
- {
- journalTransaction = new JournalTransaction();
-
- transactionInfos.put(transactionID, journalTransaction);
- }
-
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
-
- if (healthy)
- {
- journalTransaction.prepare(file);
- }
- else
- {
- log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
- tx.invalid = true;
- }
-
- hasData = true;
+ transactionInfos.put(transactionID, journalTransaction);
}
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
+
+ if (healthy)
+ {
+ journalTransaction.prepare(file);
+ }
+ else
+ {
+ log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+ tx.invalid = true;
+ }
+
+ hasData = true;
+
break;
}
case COMMIT_RECORD:
@@ -996,26 +1021,31 @@
}
}
- for (Long deleteValue: tx.recordsToDelete)
+ for (long deleteValue: tx.recordsToDelete)
{
loadManager.deleteRecord(deleteValue);
}
+
journalTransaction.commit(file);
}
else
{
log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
+
journalTransaction.forget();
}
hasData = true;
}
+ else
+ {
+ //TODO isn't this an error?
+ }
break;
}
case ROLLBACK_RECORD:
- {
-
+ {
TransactionHolder tx = transactions.remove(transactionID);
if (tx != null)
@@ -1032,6 +1062,10 @@
hasData = true;
}
+ else
+ {
+ //TODO is this an error?
+ }
break;
}
@@ -1046,7 +1080,7 @@
// This is a sanity check about the loading code itself.
// If this checkSize doesn't match, it means the reading method is not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
}
@@ -1113,6 +1147,7 @@
else
{
currentFile = freeFiles.remove();
+
openFile(currentFile);
}
@@ -1124,7 +1159,7 @@
{
log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
- JournalTransaction transactionInfo = this.transactionInfos.get(transaction.transactionID);
+ JournalTransaction transactionInfo = transactionInfos.get(transaction.transactionID);
if (transactionInfo == null)
{
@@ -1139,7 +1174,7 @@
}
else
{
- PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.xidData);
+ PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.records.addAll(transaction.recordInfos);
@@ -1163,8 +1198,7 @@
// TestableJournal implementation --------------------------------------------------------------
-
- public void setAutoReclaim(boolean autoReclaim)
+ public void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
}
@@ -1348,6 +1382,8 @@
// Add/update methods only used on testcases (using a byte[]). Those methods are now part of the Test interface ------------------------
+ //FIXME - why have these methods?? Why not just use the normal public interface methods
+ //They should be removed
public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
{
@@ -1384,7 +1420,7 @@
throw new IllegalStateException("Journal is not stopped");
}
- this.filesExecutor = Executors.newSingleThreadExecutor();
+ filesExecutor = Executors.newSingleThreadExecutor();
state = STATE_STARTED;
}
@@ -1402,9 +1438,10 @@
}
filesExecutor.shutdown();
- while (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
+
+ if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
- log.warn("Couldn't stop Journal after 60 seconds", new Exception ("Warning: Couldn't stop journal after 60 Seconds"));
+ log.warn("Couldn't stop journal executor after 60 seconds");
}
for (JournalFile file: openedFiles)
@@ -1469,6 +1506,9 @@
// This line aways throws a compilation-warning, even thought is a completely legal operation.
// We could remove the SuppressWarnings as soon as we find better expression on this line:
+
+ //TODO sure - if something throws a warning doesn't mean it's illegal - if it were illegal it wouldn't compile!
+
Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
for (int i = 0; i < numberOfFiles; i++)
@@ -1485,6 +1525,9 @@
* <p> We record a summary about the records on the journal file on COMMIT and PREPARE.
* When we load the records we build a new summary and we check the original summary to the current summary.
* This method is basically verifying if the entire transaction is being loaded </p>
+ *
+ * Summary means what? More info please
+ *
* @param journalTransaction
* @param orderedFiles
* @param recordedSummary
@@ -1495,8 +1538,7 @@
final Pair<Integer, Integer>[] recordedSummary)
{
boolean healthy = true;
-
-
+
// (I) First we get the summary of what we really have on the files now:
// FileID, NumberOfElements
@@ -1527,6 +1569,7 @@
if (found)
{
healthy = false;
+
break;
}
}
@@ -1574,7 +1617,8 @@
ByteBuffer bb = newBuffer(size);
bb.put(recordType);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ // bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(txID);
bb.putInt(tx.getElementsSummary().size());
@@ -1586,7 +1630,7 @@
}
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
return bb;
}
@@ -1603,7 +1647,8 @@
ByteBuffer bb = newBuffer(size);
bb.put(recordType);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ //bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(-1); // skip ID part
bb.putLong(txID);
bb.putInt(xidSize);
bb.putInt(tx.getElementsSummary().size());
@@ -1616,7 +1661,7 @@
}
bb.putInt(size);
- bb.rewind();
+ bb.rewind(); //Is this necessary?
return bb;
}
@@ -1720,6 +1765,7 @@
}
Collections.sort(orderedFiles, new JournalFileComparator());
+
return orderedFiles;
}
@@ -1729,17 +1775,24 @@
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
int size = bb.limit();
+
checkFile(size);
+
bb.position(SIZE_BYTE);
+
if (currentFile == null)
{
- throw new Exception ("Current file = null");
+ throw new IllegalStateException("Current file = null");
}
+
bb.putInt(currentFile.getOrderingID());
+
bb.rewind();
+
if (callback != null)
{
currentFile.getFile().write(bb, callback);
+
if (sync)
{
callback.waitCompletion();
@@ -1749,7 +1802,9 @@
{
currentFile.getFile().write(bb, sync);
}
+
currentFile.extendOffset(size);
+
return currentFile;
}
@@ -1766,7 +1821,7 @@
String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
- if (trace) trace("Creating file " + fileName);
+ if (trace) { trace("Creating file " + fileName); }
SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO);
@@ -1797,8 +1852,11 @@
private void openFile(final JournalFile file) throws Exception
{
file.getFile().open();
+
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+
file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
+
if (this.reuseBufferSize > 0)
{
file.getFile().setBufferCallback(buffersControl.callback);
@@ -1925,7 +1983,7 @@
private void closeFile(final JournalFile file)
{
- this.filesExecutor.execute(new Runnable() { public void run()
+ filesExecutor.execute(new Runnable() { public void run()
{
try
{
@@ -1949,6 +2007,7 @@
tx = new JournalTransaction();
JournalTransaction trans = transactionInfos.putIfAbsent(txID, tx);
+
if (trans != null)
{
tx = trans;
@@ -1967,7 +2026,9 @@
if (callback == null)
{
callback = new TransactionCallback();
+
TransactionCallback callbackCheck = transactionCallbacks.putIfAbsent(transactionId, callback);
+
if (callbackCheck != null)
{
callback = callbackCheck;
@@ -1980,6 +2041,7 @@
}
callback.countUp();
+
return callback;
}
else
@@ -1988,17 +2050,13 @@
}
}
- public ByteBuffer newBuffer(int size)
+ public ByteBuffer newBuffer(final int size)
{
- return this.buffersControl.newBuffer(size);
+ return buffersControl.newBuffer(size);
}
- // ------------------------------------------------------------------------------------
-
-
// Inner classes ---------------------------------------------------------------------------
-
-
+
// Just encapsulates the VariableLatch waiting for transaction completions
// Used if the SequentialFile supports Callbacks
private static class TransactionCallback implements IOCallback
@@ -2032,7 +2090,9 @@
public void onError(final int errorCode, final String errorMessage)
{
this.errorMessage = errorMessage;
+
this.errorCode = errorCode;
+
countLatch.down();
}
@@ -2077,7 +2137,6 @@
}
}
}
-
/** Class that will control buffer-reuse */
class ReuseBuffersController
@@ -2091,14 +2150,16 @@
final BufferCallback callback = new LocalBufferCallback();
- public ByteBuffer newBuffer(int size)
+ public ByteBuffer newBuffer(final int size)
{
// if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread just to cleanup this
if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
{
log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+
bufferReuseLastTime = System.currentTimeMillis();
+
reuseBuffers.clear();
}
@@ -2109,22 +2170,24 @@
}
else
{
-
// We need to allocate buffers following the rules of the storage being used (AIO/NIO)
int alignedSize = fileFactory.calculateBlockSize(size);
// Try getting a buffer from the queue...
ByteBuffer buffer = this.reuseBuffers.poll();
+
if (buffer == null)
{
// if empty create a new one.
buffer = fileFactory.newBuffer(reuseBufferSize);
+
buffer.limit(alignedSize);
}
else
{
// set the limit of the buffer to the size being required
buffer.limit(alignedSize);
+
fileFactory.clearBuffer(buffer);
}
@@ -2136,23 +2199,20 @@
private class LocalBufferCallback implements BufferCallback
{
-
public void bufferDone(ByteBuffer buffer)
{
bufferReuseLastTime = System.currentTimeMillis();
+
// If a buffer has any other than the configured size, the buffer will be just sent to GC
// This could happen if
if (buffer.capacity() == reuseBufferSize)
{
reuseBuffers.offer(buffer);
}
- }
-
- }
+ }
+ }
+ }
- }
-
-
private class JournalTransaction
{
private List<Pair<JournalFile, Long>> pos;
@@ -2161,13 +2221,13 @@
private Set<JournalFile> transactionPos;
- // Number of elements participating on the transaction
+ // Map of file id to number of elements participating on the transaction in that file
// Used to verify completion on reload
- private final Map<Integer, AtomicInteger> numberOfElements = new HashMap<Integer, AtomicInteger>();
+ private final Map<Integer, AtomicInteger> numberOfElementsPerFile = new HashMap<Integer, AtomicInteger>();
public Map<Integer, AtomicInteger> getElementsSummary()
{
- return numberOfElements;
+ return numberOfElementsPerFile;
}
public void addPositive(final JournalFile file, final long id)
@@ -2290,12 +2350,12 @@
private AtomicInteger getCounter(final JournalFile file)
{
- AtomicInteger value = numberOfElements.get(file.getOrderingID());
+ AtomicInteger value = numberOfElementsPerFile.get(file.getOrderingID());
if (value == null)
{
value = new AtomicInteger();
- numberOfElements.put(file.getOrderingID(), value);
+ numberOfElementsPerFile.put(file.getOrderingID(), value);
}
return value;
@@ -2303,9 +2363,10 @@
}
+
+ //FIXME: This class should be removed it is only used by methods called by tests - move it to the test not the core code !!
private static class ByteArrayEncoding implements EncodingSupport
{
-
final byte[] data;
ByteArrayEncoding(final byte[] data)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-09 22:41:33 UTC (rev 4924)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-10 09:29:43 UTC (rev 4925)
@@ -22,14 +22,13 @@
package org.jboss.messaging.core.journal.impl;
-import org.jboss.messaging.core.journal.RecordInfo;
-
-import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.jboss.messaging.core.journal.RecordInfo;
+
/**
*
* A TransactionHolder
@@ -56,6 +55,6 @@
public boolean invalid;
- public byte[] xidData;
+ public byte[] extraData;
}
More information about the jboss-cvs-commits
mailing list