[jboss-cvs] JBoss Messaging SVN: r4722 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 24 08:39:56 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-24 08:39:56 -0400 (Thu, 24 Jul 2008)
New Revision: 4722
Added:
trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
Log:
Journal work (dealing with transaction timeouts and Journal Reload)
Modified: trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java 2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java 2008-07-24 12:39:56 UTC (rev 4722)
@@ -36,4 +36,14 @@
void updateRecord(RecordInfo info);
void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+
+ /**
+ *
+ * This may happen in a rare situation where a transaction commit timed out on AIO,
+ * And right after that a rollback was fired but the previous transaction was completed when the TransactionCallback was already forgotten.
+ *
+ * This is because libaio's forget method is not working, so we have to come up with this "hack"
+ *
+ * */
+ void restart();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-24 12:39:56 UTC (rev 4722)
@@ -575,7 +575,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.remove(txID);
+ JournalTransaction tx = transactionInfos.get(txID);
if (tx == null)
{
@@ -584,6 +584,7 @@
JournalFile usedFile = writeTransaction(COMMIT_RECORD, txID, tx);
+ transactionInfos.remove(txID);
transactionCallbacks.remove(txID);
tx.commit(usedFile);
@@ -597,7 +598,7 @@
throw new IllegalStateException("Journal must be loaded first");
}
- JournalTransaction tx = transactionInfos.remove(txID);
+ JournalTransaction tx = transactionInfos.get(txID);
if (tx == null)
{
@@ -616,6 +617,7 @@
JournalFile usedFile = appendRecord(bb, syncTransactional, getTransactionCallback(txID));
+ transactionInfos.remove(txID);
transactionCallbacks.remove(txID);
tx.rollback(usedFile);
@@ -652,6 +654,12 @@
{
recordsToDelete.add(id);
}
+
+ public void restart()
+ {
+ recordsToDelete.clear();
+ records.clear();
+ }
});
@@ -675,393 +683,440 @@
throw new IllegalStateException("Journal must be in started state");
}
- Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ boolean fileConsistent = true;
- List<JournalFile> orderedFiles = orderFiles();
+ Map<Long, TransactionHolder> transactions = null;
int lastDataPos = -1;
-
+
+ long maxMessageID = -1;
+
long maxTransactionID = -1;
- long maxMessageID = -1;
-
- for (JournalFile file: orderedFiles)
- {
- file.getFile().open();
+ HashSet<Long> commitsToForget = new HashSet<Long>();
+ HashSet<Long> performedCommits = new HashSet<Long>();
+
+ do
+ {
+
+ if (!fileConsistent)
+ {
+ loadManager.restart();
+ }
- ByteBuffer bb = fileFactory.newBuffer(fileSize);
+ fileConsistent = true;
- int bytesRead = file.getFile().read(bb);
+ performedCommits.clear();
- if (bytesRead != fileSize)
- {
- //deal with this better
-
- throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " + fileSize + " : " + file.getFile().getFileName());
- }
+ dataFiles.clear();
+ freeFiles.clear();
+ currentFile = null;
- //First long is the ordering timestamp, we just jump its position
- bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
+ transactions = new LinkedHashMap<Long, TransactionHolder>();
- boolean hasData = false;
+ List<JournalFile> orderedFiles = orderFiles();
- while (bb.hasRemaining())
- {
- final int pos = bb.position();
+ lastDataPos = -1;
+
+ maxTransactionID = -1;
+
+ maxMessageID = -1;
+
+ for (JournalFile file: orderedFiles)
+ {
+ file.getFile().open();
- byte recordType = bb.get();
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
- {
- if (trace)
- {
- log.trace("Invalid record type at " + bb.position() + " file:" + file);
- }
- continue;
- }
-
- if (bb.position() + SIZE_INT > fileSize)
- {
- continue;
- }
-
- int readFileId = bb.getInt();
+ ByteBuffer bb = fileFactory.newBuffer(fileSize);
- if (readFileId != file.getOrderingID())
+ int bytesRead = file.getFile().read(bb);
+
+ if (bytesRead != fileSize)
{
- // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
- hasData = true;
-
- bb.position(pos + 1);
- //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
- continue;
+ //deal with this better
+
+ throw new IllegalStateException("File is wrong size " + bytesRead +
+ " expected " + fileSize + " : " + file.getFile().getFileName());
}
- long transactionID = 0;
+ //First long is the ordering timestamp, we just jump its position
+ bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
- if (isTransaction(recordType))
- {
- if (bb.position() + SIZE_LONG > fileSize)
- {
- continue;
- }
- transactionID = bb.getLong();
- maxTransactionID = Math.max(maxTransactionID, transactionID);
- }
+ boolean hasData = false;
- long recordID = 0;
- if (!isCompleteTransaction(recordType))
+ while (bb.hasRemaining())
{
- if (bb.position() + SIZE_LONG > fileSize)
+ final int pos = bb.position();
+
+ byte recordType = bb.get();
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
+ if (trace)
+ {
+ log.trace("Invalid record type at " + bb.position() + " file:" + file);
+ }
continue;
}
- recordID = bb.getLong();
- maxMessageID = Math.max(maxMessageID, recordID);
- }
-
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
- byte userRecordType = 0;
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
+
if (bb.position() + SIZE_INT > fileSize)
{
continue;
}
+
+ int readFileId = bb.getInt();
- variableSize = bb.getInt();
-
- if (bb.position() + variableSize > fileSize)
+ if (readFileId != file.getOrderingID())
{
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
+
+ bb.position(pos + 1);
+ //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
continue;
}
- userRecordType = bb.get();
+ long transactionID = 0;
- record = new byte[variableSize];
- bb.get(record);
- }
-
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- variableSize = bb.getInt() * SIZE_INT * 2;
- }
-
- int recordSize = getRecordSize(recordType);
-
- if (pos + recordSize + variableSize > fileSize)
- {
- continue;
- }
-
- int oldPos = bb.position();
-
- bb.position(pos + variableSize + recordSize - SIZE_INT);
-
- int checkSize = bb.getInt();
-
- if (checkSize != variableSize + recordSize)
- {
- log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
- // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
- hasData = true;
- bb.position(pos + SIZE_BYTE);
- continue;
- }
-
- bb.position(oldPos);
-
- switch(recordType)
- {
- case ADD_RECORD:
- {
- loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
-
- posFilesMap.put(recordID, new PosFiles(file));
-
- hasData = true;
-
- break;
- }
- case UPDATE_RECORD:
+ if (isTransaction(recordType))
{
- loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
- hasData = true;
- file.incPosCount();
-
- PosFiles posFiles = posFilesMap.get(recordID);
-
- if (posFiles != null)
+ if (bb.position() + SIZE_LONG > fileSize)
{
- //It's legal for this to be null. The file(s) with the may have been deleted
- //just leaving some updates in this file
-
- posFiles.addUpdateFile(file);
+ continue;
}
-
- break;
- }
- case DELETE_RECORD:
+ transactionID = bb.getLong();
+ maxTransactionID = Math.max(maxTransactionID, transactionID);
+ }
+
+ long recordID = 0;
+ if (!isCompleteTransaction(recordType))
{
- loadManager.deleteRecord(recordID);
- hasData = true;
-
- PosFiles posFiles = posFilesMap.remove(recordID);
-
- if (posFiles != null)
+ if (bb.position() + SIZE_LONG > fileSize)
{
- posFiles.addDelete(file);
- }
-
- break;
- }
- case ADD_RECORD_TX:
- case UPDATE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- if (tx == null)
- {
- tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ continue;
}
-
- tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
-
- JournalTransaction tnp = transactionInfos.get(transactionID);
-
- if (tnp == null)
+ recordID = bb.getLong();
+ maxMessageID = Math.max(maxMessageID, recordID);
+ }
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+ byte userRecordType = 0;
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
+ if (bb.position() + SIZE_INT > fileSize)
{
- tnp = new JournalTransaction();
-
- transactionInfos.put(transactionID, tnp);
+ continue;
}
- tnp.addPositive(file, recordID);
+ variableSize = bb.getInt();
- hasData = true;
-
- break;
- }
- case DELETE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- if (tx == null)
+ if (bb.position() + variableSize > fileSize)
{
- tx = new TransactionHolder(transactionID);
- transactions.put(transactionID, tx);
+ continue;
}
- tx.recordsToDelete.add(recordID);
+ userRecordType = bb.get();
- JournalTransaction tnp = transactionInfos.get(transactionID);
-
- if (tnp == null)
+ record = new byte[variableSize];
+ bb.get(record);
+ }
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ variableSize = bb.getInt() * SIZE_INT * 2;
+ }
+
+ int recordSize = getRecordSize(recordType);
+
+ if (pos + recordSize + variableSize > fileSize)
+ {
+ continue;
+ }
+
+ int oldPos = bb.position();
+
+ bb.position(pos + variableSize + recordSize - SIZE_INT);
+
+ int checkSize = bb.getInt();
+
+ if (checkSize != variableSize + recordSize)
+ {
+ log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
+ bb.position(pos + SIZE_BYTE);
+ continue;
+ }
+
+ bb.position(oldPos);
+
+ switch(recordType)
+ {
+ case ADD_RECORD:
+ {
+ loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+
+ posFilesMap.put(recordID, new PosFiles(file));
+
+ hasData = true;
+
+ break;
+ }
+ case UPDATE_RECORD:
{
- tnp = new JournalTransaction();
+ loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ hasData = true;
+ file.incPosCount();
- transactionInfos.put(transactionID, tnp);
- }
-
- tnp.addNegative(file, recordID);
-
- hasData = true;
-
- break;
- }
- case PREPARE_RECORD:
- {
- TransactionHolder tx = transactions.get(transactionID);
-
- // We need to read it even if transaction was not found, or the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
- if (tx != null)
+ PosFiles posFiles = posFilesMap.get(recordID);
+
+ if (posFiles != null)
+ {
+ //It's legal for this to be null. The file(s) with the may have been deleted
+ //just leaving some updates in this file
+
+ posFiles.addUpdateFile(file);
+ }
+
+ break;
+ }
+ case DELETE_RECORD:
{
+ loadManager.deleteRecord(recordID);
+ hasData = true;
- tx.prepared = true;
+ PosFiles posFiles = posFilesMap.remove(recordID);
- JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+ if (posFiles != null)
+ {
+ posFiles.addDelete(file);
+ }
- if (journalTransaction == null)
+ break;
+ }
+ case ADD_RECORD_TX:
+ case UPDATE_RECORD_TX:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ if (tx == null)
{
- throw new IllegalStateException("Cannot find tx " + transactionID);
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
+ tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
+ JournalTransaction tnp = transactionInfos.get(transactionID);
- if (healthy)
+ if (tnp == null)
{
- journalTransaction.prepare(file);
+ tnp = new JournalTransaction();
+
+ transactionInfos.put(transactionID, tnp);
}
- else
+
+ tnp.addPositive(file, recordID);
+
+ hasData = true;
+
+ break;
+ }
+ case DELETE_RECORD_TX:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ if (tx == null)
{
- log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
- journalTransaction.setInvalid(true);
- tx.invalid = true;
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
- hasData = true;
- }
-
- break;
- }
- case COMMIT_RECORD:
- {
- TransactionHolder tx = transactions.remove(transactionID);
-
- // We need to read it even if transaction was not found, or the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
-
- if (tx != null)
- {
+ tx.recordsToDelete.add(recordID);
- JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
+ JournalTransaction tnp = transactionInfos.get(transactionID);
- if (journalTransaction == null)
+ if (tnp == null)
{
- throw new IllegalStateException("Cannot find tx " + transactionID);
+ tnp = new JournalTransaction();
+
+ transactionInfos.put(transactionID, tnp);
}
-
- boolean healthy = checkTransactionHealth(
- journalTransaction, orderedFiles, values);
+ tnp.addNegative(file, recordID);
- if (healthy)
+ hasData = true;
+
+ break;
+ }
+ case PREPARE_RECORD:
+ {
+ TransactionHolder tx = transactions.get(transactionID);
+
+ // We need to read it even if transaction was not found, or the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+ Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+
+ if (tx != null)
{
- for (RecordInfo txRecord: tx.recordInfos)
+
+ tx.prepared = true;
+
+ JournalTransaction journalTransaction = transactionInfos.get(transactionID);
+
+ if (journalTransaction == null)
{
- if (txRecord.isUpdate)
+ throw new IllegalStateException("Cannot find tx " + transactionID);
+ }
+
+
+ boolean healthy = checkTransactionHealth(
+ journalTransaction, orderedFiles, values);
+
+ if (healthy)
+ {
+ journalTransaction.prepare(file);
+ }
+ else
+ {
+ log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+ journalTransaction.setInvalid(true);
+ tx.invalid = true;
+ }
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+ TransactionHolder tx = transactions.remove(transactionID);
+
+ // We need to read it even if transaction was not found, or the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+ Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+
+ if (tx != null)
+ {
+
+ JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
+
+ if (journalTransaction == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + transactionID);
+ }
+
+ boolean healthy = checkTransactionHealth(
+ journalTransaction, orderedFiles, values);
+
+
+ if (commitsToForget.contains(transactionID))
+ {
+ log.warn("Transaction being ignored because of a post rollback");
+ journalTransaction.forget();
+ }
+ else
+ if (healthy)
+ {
+ performedCommits.add(transactionID);
+
+ for (RecordInfo txRecord: tx.recordInfos)
{
- loadManager.updateRecord(txRecord);
+ if (txRecord.isUpdate)
+ {
+ loadManager.updateRecord(txRecord);
+ }
+ else
+ {
+ loadManager.addRecord(txRecord);
+ }
}
- else
+
+ for (Long deleteValue: tx.recordsToDelete)
{
- loadManager.addRecord(txRecord);
+ loadManager.deleteRecord(deleteValue);
}
+ journalTransaction.commit(file);
}
-
- for (Long deleteValue: tx.recordsToDelete)
+ else
{
- loadManager.deleteRecord(deleteValue);
+ log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
+ journalTransaction.forget();
}
- journalTransaction.commit(file);
+
+ hasData = true;
}
- else
- {
- log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
- journalTransaction.forget();
- }
- hasData = true;
+ break;
}
-
- break;
- }
- case ROLLBACK_RECORD:
- {
- TransactionHolder tx = transactions.remove(transactionID);
-
- if (tx != null)
- {
- JournalTransaction tnp = transactionInfos.remove(transactionID);
+ case ROLLBACK_RECORD:
+ {
+ TransactionHolder tx = transactions.remove(transactionID);
- if (tnp == null)
+ if (performedCommits.contains(transactionID) && !commitsToForget.contains(transactionID))
{
- throw new IllegalStateException("Cannot find tx " + transactionID);
+ log.warn("Transaction " + transactionID + " was rolled back after its commit! Reload will need to be restarted with that transaction being ignored");
+ commitsToForget.add(transactionID);
+ fileConsistent = false;
}
- tnp.rollback(file);
- hasData = true;
+ if (tx != null)
+ {
+ JournalTransaction tnp = transactionInfos.remove(transactionID);
+
+ if (tnp == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + transactionID);
+ }
+
+ tnp.rollback(file);
+
+ hasData = true;
+ }
+
+ break;
}
-
- break;
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " + recordType);
+ }
}
- default:
+
+ checkSize = bb.getInt();
+
+ if (checkSize != variableSize + recordSize)
{
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " + recordType);
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
}
+
+ bb.position(file.getFile().calculateBlockStart(bb.position()));
+
+ if (recordType != FILL_CHARACTER)
+ {
+ lastDataPos = bb.position();
+ }
}
- checkSize = bb.getInt();
+ file.getFile().close();
- if (checkSize != variableSize + recordSize)
- {
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ if (hasData)
+ {
+ dataFiles.add(file);
}
-
- bb.position(file.getFile().calculateBlockStart(bb.position()));
-
- if (recordType != FILL_CHARACTER)
- {
- lastDataPos = bb.position();
- }
+ else
+ {
+ //Empty dataFiles with no data
+ freeFiles.add(file);
+ }
}
-
- file.getFile().close();
-
- if (hasData)
- {
- dataFiles.add(file);
- }
- else
- {
- //Empty dataFiles with no data
- freeFiles.add(file);
- }
- }
+ transactionIDSequence.set(maxTransactionID + 1);
+ }
+ while (!fileConsistent);
- transactionIDSequence.set(maxTransactionID + 1);
//Create any more files we need
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java 2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/ValidateTransactionHealthTest.java 2008-07-24 12:39:56 UTC (rev 4722)
@@ -227,6 +227,11 @@
numberOfUpdates++;
}
+
+ public void restart()
+ {
+ ex = new Exception ("Journal was restarted");
+ }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2008-07-24 12:37:05 UTC (rev 4721)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2008-07-24 12:39:56 UTC (rev 4722)
@@ -107,6 +107,10 @@
public void updateRecord(RecordInfo info)
{
}
+
+ public void restart()
+ {
+ }
});
Added: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalAsyncTimeoutsTest.java 2008-07-24 12:39:56 UTC (rev 4722)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+public class JournalAsyncTimeoutsTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private FakeSequentialFileFactory factory;
+
+ JournalImpl journalImpl = null;
+
+ private ArrayList<RecordInfo> records = null;
+
+ private ArrayList<PreparedTransactionInfo> transactions = null;
+
+ // Static --------------------------------------------------------
+
+ private static final Logger log = Logger
+ .getLogger(JournalAsyncTimeoutsTest.class);
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAsynchronousCommit() throws Exception
+ {
+// final int JOURNAL_SIZE = 20000;
+//
+// setupJournal(JOURNAL_SIZE, 100, 5);
+//
+// assertEquals(2, factory.listFiles("tt").size());
+//
+// assertEquals(0, records.size());
+// assertEquals(0, transactions.size());
+//
+// for (int i = 0; i < 10 ; i++)
+// {
+// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+// journalImpl.forceMoveNextFile();
+// }
+//
+//
+// for (int i = 10; i < 20 ; i++)
+// {
+// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+// journalImpl.forceMoveNextFile();
+// }
+//
+// journalImpl.forceMoveNextFile();
+//
+// journalImpl.appendCommitRecord(1l);
+//
+ }
+
+
+
+ public void testTransactionTimeoutOnCommit() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+
+ assertEquals(5, factory.listFiles("tt").size());
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ factory.setHoldCallbacks(true);
+
+ for (int i = 0; i < 20; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
+ new SimpleEncoding(1, (byte) 15));
+ }
+
+ try
+ {
+ journalImpl.appendCommitRecord(1l);
+ fail ("Supposed to timeout");
+ }
+ catch (Exception e)
+ {
+ }
+
+ factory.flushAllCallbacks();
+
+ factory.setHoldCallbacks(false);
+
+ journalImpl.appendRollbackRecord(1l);
+
+ setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+
+ assertEquals(0, records.size());
+ assertEquals(0, journalImpl.getDataFilesCount());
+ }
+
+ public void testTransactionTimeoutOnRollback() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+
+ assertEquals(5, factory.listFiles("tt").size());
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ factory.setHoldCallbacks(true);
+
+ for (int i = 0; i < 20; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long) i, (byte) 0,
+ new SimpleEncoding(1, (byte) 15));
+ }
+
+ try
+ {
+ journalImpl.appendRollbackRecord(1l);
+ fail ("Supposed to timeout");
+ }
+ catch (Exception e)
+ {
+ }
+
+ factory.flushAllCallbacks();
+
+ factory.setHoldCallbacks(false);
+
+ // it shouldn't fail
+ journalImpl.appendRollbackRecord(1l);
+
+ setupJournal(JOURNAL_SIZE, 1, 5, 1000);
+
+ assertEquals(0, records.size());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ records = new ArrayList<RecordInfo>();
+
+ transactions = new ArrayList<PreparedTransactionInfo>();
+
+ factory = null;
+
+ journalImpl = null;
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (journalImpl != null)
+ {
+ try
+ {
+ journalImpl.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Private -------------------------------------------------------
+ private void setupJournal(final int journalSize, final int alignment,
+ final int numberOfMinimalFiles, final int timeout) throws Exception
+ {
+ if (factory == null)
+ {
+ factory = new FakeSequentialFileFactory(alignment, true);
+ }
+
+ if (journalImpl != null)
+ {
+ journalImpl.stop();
+ }
+
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true,
+ true, factory, "tt", "tt", 1000, timeout);
+
+ journalImpl.start();
+
+ records.clear();
+ transactions.clear();
+
+ journalImpl.load(records, transactions);
+ }
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list