[rhmessaging-commits] rhmessaging commits: r2199 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jul 14 13:32:13 EDT 2008
Author: kpvdr
Date: 2008-07-14 13:32:13 -0400 (Mon, 14 Jul 2008)
New Revision: 2199
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
Log:
Backport of trunk r.2189: Fixed 1PC multi-queue atomic commits/aborts. Also fixed unnecessary prepared list writes that occur in local transactions that contain no durable messages.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:32:13 UTC (rev 2199)
@@ -1218,7 +1218,7 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn,
+void BdbMessageStore::completed(TxnCtxt& txn,
bool commit)
{
try {
@@ -1256,21 +1256,25 @@
void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
- TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
+ TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
+ localPrepare(txn);
+}
+void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
+{
try {
chkInitPreparedXidStore();
- txn->incrDtokRef();
- DataTokenImpl* dtokp = txn->getDtok();
+ ctxt->incrDtokRef();
+ DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->prepare(preparedXidStorePtr.get());
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
- txn->sync();
+ ctxt->sync();
} catch (const std::exception& e) {
- QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
+ QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
}
}
@@ -1279,22 +1283,22 @@
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
- } else {
- txn->complete(true);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
- if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
- } else {
- txn->complete(false);
+ if (!txn->isTPC()) {
+ if (txn->impactedQueuesEmpty()) return;
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
TxnCtxt* BdbMessageStore::check(TransactionContext* ctxt)
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:32:13 UTC (rev 2199)
@@ -169,7 +169,7 @@
bool create(Db& db,
IdSequence& seq,
const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn,
+ void completed(TxnCtxt& txn,
bool commit);
void record2pcOp(Db& db,
TPCTxnCtxt& txn,
@@ -306,9 +306,15 @@
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
+
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+
void prepare(qpid::broker::TPCTransactionContext& ctxt);
+
+ void localPrepare(TxnCtxt* ctxt);
+
void commit(qpid::broker::TransactionContext& ctxt);
+
void abort(qpid::broker::TransactionContext& ctxt);
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:32:13 UTC (rev 2199)
@@ -59,6 +59,7 @@
IdSequence* loggedtx;
boost::intrusive_ptr<DataTokenImpl> dtokp;
AutoScopedLock globalHolder;
+ JournalImpl* preparedXidStorePtr;
/**
* local txn id, if non XA.
@@ -72,6 +73,8 @@
commitTxn(static_cast<JournalImpl*>(*i), commit);
}
impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
}
void commitTxn(JournalImpl* jc, bool commit) {
@@ -95,7 +98,7 @@
public:
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
if (loggedtx) {
std::stringstream s;
s << "rhm-tid" << this;
@@ -125,6 +128,8 @@
for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
firstloop = false;
}
}
@@ -167,7 +172,9 @@
virtual const std::string& getXid() { return tid; }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
void complete(bool commit) { completeTxn(commit); }
+ bool impactedQueuesEmpty() { return impactedQueues.empty(); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -182,25 +189,9 @@
{
protected:
const std::string xid;
- JournalImpl* preparedXidStorePtr;
- virtual void completeTxn(bool commit) {
- TxnCtxt::completeTxn(commit);
- if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
- }
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
- void sync() {
- TxnCtxt::sync();
- bool allWritten = false;
- if (preparedXidStorePtr) {
- while (!allWritten) {
- allWritten = true;
- sync_jrnl(preparedXidStorePtr, true, allWritten);
- }
- }
- }
- inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:32:13 UTC (rev 2199)
@@ -33,6 +33,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -55,6 +56,7 @@
// to end to simulate multi-queue txn complete failure.
while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
}
+ void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
};
// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
@@ -68,6 +70,25 @@
// pass sequence number for c/a
return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
}
+ void commit(TransactionContext& ctxt, const bool complete_prepared_list) {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), true);
+ }
+ void abort(TransactionContext& ctxt, const bool complete_prepared_list)
+ {
+ checkInit();
+ TxnCtxt* txn(check(&ctxt));
+ if (!txn->isTPC()) {
+ localPrepare(dynamic_cast<TxnCtxt*>(txn));
+ if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+ }
+ completed(*dynamic_cast<TxnCtxt*>(txn), false);
+ }
};
// === Helper fns ===
@@ -178,10 +199,11 @@
checkMsg(y, 0);
}
-void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
{
setup<TestMessageStore>();
- std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
+ TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+ std::auto_ptr<TransactionContext> txn(tmsp->begin());
//create two messages and enqueue them onto both queues:
boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
@@ -193,9 +215,9 @@
static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
if (commit)
- store->commit(*txn);
+ tmsp->commit(*txn, complete_prepared_list);
else
- store->abort(*txn);
+ tmsp->abort(*txn, complete_prepared_list);
restart<TestMessageStore>();
// Check outcome
@@ -225,47 +247,61 @@
swap(false);
cout << "ok" << endl;
}
-/*
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ testMultiQueueTxn(2, true, true);
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
{
cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
- testMultiQueueTxn(0, true);
+ testMultiQueueTxn(0, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
{
cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
- testMultiQueueTxn(0, false);
+ testMultiQueueTxn(0, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
{
cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
- testMultiQueueTxn(1, true);
+ testMultiQueueTxn(1, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
{
cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
- testMultiQueueTxn(1, false);
+ testMultiQueueTxn(1, false, false);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
{
cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
- testMultiQueueTxn(2, true);
+ testMultiQueueTxn(2, false, true);
cout << "ok" << endl;
}
QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
{
cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
- testMultiQueueTxn(2, false);
+ testMultiQueueTxn(2, false, false);
cout << "ok" << endl;
}
-*/
+
QPID_AUTO_TEST_SUITE_END()
More information about the rhmessaging-commits
mailing list