Author: kpvdr
Date: 2008-07-11 17:05:48 -0400 (Fri, 11 Jul 2008)
New Revision: 2189
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/tests/TransactionalTest.cpp
Log:
Fixed 1PC multi-queue atomic commits/aborts. Also fixed unnecessary prepared list writes
that occur in local transactions that contain no durable messages.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-11 21:05:48 UTC (rev 2189)
@@ -1222,7 +1222,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn,
+void BdbMessageStore::completed(TxnCtxt& txn,
bool commit)
{
try {
@@ -1260,21 +1260,25 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
- TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
+ localPrepare(txn);
+}
+void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
+{
try {
chkInitPreparedXidStore();
- txn->incrDtokRef();
- DataTokenImpl* dtokp = txn->getDtok();
+ ctxt->incrDtokRef();
+ DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(),
false);
- txn->prepare(preparedXidStorePtr.get());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp,
ctxt->getXid(), false);
+ ctxt->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
- txn->sync();
+ ctxt->sync();
} catch (const std::exception& e) {
- QPID_LOG(error, "Error preparing xid " << txn->getXid()
<< ": " << e.what());
+ QPID_LOG(error, "Error preparing xid " << ctxt->getXid()
<< ": " << e.what());
throw;
}
}
@@ -1283,22 +1287,22 @@
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
- } else {
- txn->complete(true);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
- } else {
- txn->complete(false);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
TxnCtxt* BdbMessageStore::check(TransactionContext* ctxt)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-11 21:05:48 UTC (rev 2189)
@@ -169,7 +169,7 @@
bool create(Db& db,
IdSequence& seq,
const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn,
+ void completed(TxnCtxt& txn,
bool commit);
void record2pcOp(Db& db,
TPCTxnCtxt& txn,
@@ -311,6 +311,8 @@
void prepare(qpid::broker::TPCTransactionContext& ctxt);
+ void localPrepare(TxnCtxt* ctxt);
+
void commit(qpid::broker::TransactionContext& ctxt);
void abort(qpid::broker::TransactionContext& ctxt);
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-11 21:05:48 UTC (rev 2189)
@@ -59,6 +59,7 @@
IdSequence* loggedtx;
boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
+ JournalImpl* preparedXidStorePtr;
/**
* local txn id, if non XA.
@@ -72,6 +73,8 @@
commitTxn(static_cast<JournalImpl*>(*i), commit);
}
impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
}
void commitTxn(JournalImpl* jc, bool commit) {
@@ -95,7 +98,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl),
txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl),
preparedXidStorePtr(0), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -125,6 +128,8 @@
for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
firstloop = false;
}
}
@@ -167,7 +172,9 @@
virtual const std::string& getXid() { return tid; }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
impactedQueues.insert(queue); }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr =
_preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
+ bool impactedQueuesEmpty() { return impactedQueues.empty(); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -182,25 +189,9 @@
{
protected:
const std::string xid;
- JournalImpl* preparedXidStorePtr;
- virtual void completeTxn(bool commit) {
- TxnCtxt::completeTxn(commit);
- if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
- }
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid), preparedXidStorePtr(0) {}
- void sync() {
- TxnCtxt::sync();
- bool allWritten = false;
- if (preparedXidStorePtr) {
- while (!allWritten) {
- allWritten = true;
- sync_jrnl(preparedXidStorePtr, true, allWritten);
- }
- }
- }
- inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr =
_preparedXidStorePtr; }
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid) {}
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-11 15:55:56 UTC (rev 2188)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2008-07-11 21:05:48 UTC (rev 2189)
@@ -33,6 +33,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -55,6 +56,7 @@
// to end to simulate multi-queue txn complete failure.
while (impactedQueues.size() > num_queues_rem)
impactedQueues.erase(impactedQueues.begin());
}
+ void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
};
// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to
check for
@@ -68,6 +70,25 @@
// pass sequence number for c/a
return auto_ptr<TransactionContext>(new
TestTxnCtxt(&messageIdSequence));
}
+ void commit(TransactionContext& ctxt, const bool complete_prepared_list) {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list)
dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
+ }
+ void abort(TransactionContext& ctxt, const bool complete_prepared_list)
+ {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list)
dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
+ }
};
// === Helper fns ===
@@ -178,10 +199,11 @@
checkMsg(y, 0);
}
-void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list,
const bool commit)
{
setup<TestMessageStore>();
- std::auto_ptr<TransactionContext>
txn(static_cast<TestMessageStore*>(store.get())->begin());
+ TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+ std::auto_ptr<TransactionContext> txn(tmsp->begin());
//create two messages and enqueue them onto both queues:
boost::intrusive_ptr<Message> msgA = createMessage("MessageA",
"exchange", "routing_key");
@@ -193,9 +215,9 @@
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
- store->commit(*txn);
+ tmsp->commit(*txn, complete_prepared_list);
else
- store->abort(*txn);
+ tmsp->abort(*txn, complete_prepared_list);
restart<TestMessageStore>();
// Check outcome
@@ -225,47 +247,61 @@
swap(false);
cout << "ok" << endl;
}
-/*
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
{
cout << test_filename << ".MultiQueueNoQueueCommitRecover: "
<< flush;
- testMultiQueueTxn(0, true);
+ testMultiQueueTxn(0, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
{
cout << test_filename << ".MultiQueueNoQueueAbortRecover: "
<< flush;
- testMultiQueueTxn(0, false);
+ testMultiQueueTxn(0, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
{
cout << test_filename << ".MultiQueueSomeQueueCommitRecover: "
<< flush;
- testMultiQueueTxn(1, true);
+ testMultiQueueTxn(1, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
{
cout << test_filename << ".MultiQueueSomeQueueAbortRecover: "
<< flush;
- testMultiQueueTxn(1, false);
+ testMultiQueueTxn(1, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
{
cout << test_filename << ".MultiQueueAllQueueCommitRecover: "
<< flush;
- testMultiQueueTxn(2, true);
+ testMultiQueueTxn(2, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
{
cout << test_filename << ".MultiQueueAllQueueAbortRecover: "
<< flush;
- testMultiQueueTxn(2, false);
+ testMultiQueueTxn(2, false, false);
cout << "ok" << endl;
}
-*/
+
QPID_AUTO_TEST_SUITE_END()