[rhmessaging-commits] rhmessaging commits: r2206 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Jul 18 09:27:59 EDT 2008
Author: kpvdr
Date: 2008-07-18 09:27:59 -0400 (Fri, 18 Jul 2008)
New Revision: 2206
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Additional tidy-up on new code. Renamed some functions and variables to keep naming consistent around the Transaction Prepared List (TPL) name. Deleted an old BDB function that escaped detection from a previous BDB cleanup.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-18 13:27:59 UTC (rev 2206)
@@ -50,7 +50,7 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
const bool _commit_flag) :
rid(_rid),
@@ -241,11 +241,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore", getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout, defJournalFlushTimeout));
+ tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
- THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance", e.what());
+ THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -260,12 +260,12 @@
return true;
}
-void BdbMessageStore::chkInitPreparedXidStore()
+void BdbMessageStore::chkTplStoreInit()
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- journal::jdir::create_dir(getPxidBaseDir());
- if (!preparedXidStorePtr->is_ready()) {
- preparedXidStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ journal::jdir::create_dir(getTplBaseDir());
+ if (!tplStorePtr->is_ready()) {
+ tplStorePtr->initialize(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
}
}
@@ -285,7 +285,7 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
+ if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
@@ -313,7 +313,7 @@
txn->commit(0);
try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
- journal::jdir::delete_dir(getPxidBaseDir(),true);
+ journal::jdir::delete_dir(getTplBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -474,7 +474,7 @@
{
checkInit();
txn_list prepared;
- recoverXids(prepared);
+ recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
@@ -505,14 +505,14 @@
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
std::auto_ptr<TPCTransactionContext> txn(tpcc);
- tpcc->prepare(preparedXidStorePtr.get());
+ tpcc->prepare(tplStorePtr.get());
// Restore data token state in TxnCtxt
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
- if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+ if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
tpcc->recoverDtok(citr->second.rid, i->xid);
- // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTxnFlag = citr->second.deq_flag;
@@ -581,8 +581,8 @@
maxQueueId = max(key.id, maxQueueId);
}
- // NOTE: highestRid is set by both recoverQueues() and collectPreparedXids() as
- // the messageIdSequence is used for both queue journals and the preparedXid journal.
+ // NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
+ // the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
queueIdSequence.reset(maxQueueId + 1);
@@ -669,7 +669,6 @@
generalIdSequence.reset(maxGeneralId + 1);
}
-
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
@@ -730,8 +729,8 @@
if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
std::string xid((char*)xidbuff, xidbuffSize);
- PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
- if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ TplRecoverMapCitr citr = tplRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
if (citr->second.commit_flag)
queue->recover(msg);
@@ -793,8 +792,6 @@
return recovery.recoverMessage(header.buffer);
}
-
-
int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
RecoverableMessage::shared_ptr& msg,
@@ -826,26 +823,26 @@
return count;
}
-void BdbMessageStore::recoverPreparedXidJournal()
+void BdbMessageStore::recoverTplStore()
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- preparedXidStorePtr->recover_complete(); // start journal.
+ tplStorePtr->recover_complete(); // start journal.
}
}
-void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
{
- if (preparedXidStorePtr.get()) {
- if (!preparedXidStorePtr->is_ready())
- recoverPreparedXidJournal();
+ if (tplStorePtr.get()) {
+ if (!tplStorePtr->is_ready())
+ recoverTplStore();
// TODO: The journal will return a const txn_map and the txn_map will support
// const operations at some point. Using non-const txn_map this way is ugly...
- journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
std::vector<std::string> xidList;
tmap.xid_list(xidList);
for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
@@ -865,18 +862,18 @@
}
assert(enqCnt == 1);
assert(deqCnt <= 1);
- prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
}
}
}
-void BdbMessageStore::recoverXids(txn_list& txns)
+void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -885,25 +882,11 @@
}
}
-void BdbMessageStore::readLockedMappings(Db& db,
- txn_lock_map& mappings)
-{
- Cursor c;
- c.open(db, 0);
-
- Dbt key;
- IdPairDbt value;
- while (c.next(key, value)) {
- std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
- LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
- }
-}
-
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (!preparedXidStorePtr->is_ready())
- getPreparedXidMap(preparedXidStoreRecoverMap);
- for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ if (!tplStorePtr->is_ready())
+ getTplRecoverMap(tplRecoverMap);
+ for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
xids.insert(i->first);
}
}
@@ -1250,14 +1233,15 @@
bool commit)
{
try {
+ chkTplStoreInit(); // Late initialize (if needed)
+
// Nothing to do if not prepared
- chkInitPreparedXidStore();
if (txn.getDtok()->is_enqueued()) {
txn.incrDtokRef();
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
+ tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
} catch (const std::exception& e) {
@@ -1292,13 +1276,14 @@
void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
{
try {
- chkInitPreparedXidStore();
+ chkTplStoreInit(); // Late initialize (if needed)
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
- ctxt->prepare(preparedXidStorePtr.get());
+ tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+ ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
} catch (const std::exception& e) {
@@ -1449,10 +1434,10 @@
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getTplBaseDir()
{
std::stringstream dir;
- dir << storeDir << "/rhm/pxid/" ;
+ dir << storeDir << "/rhm/tpl/" ;
return dir.str();
}
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-18 13:27:59 UTC (rev 2206)
@@ -61,17 +61,17 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- // Structs for preparedXidStore recover state
- struct PreparedRecoverStruct {
+ // Structs for Transaction Recover List (TPL) recover state
+ struct TplRecoverStruct {
u_int64_t rid;
bool deq_flag;
bool commit_flag;
- PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
};
- typedef PreparedRecoverStruct PreparedRecover;
- typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
- typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
- typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
+ typedef TplRecoverStruct TplRecover;
+ typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
+ typedef std::map<std::string, TplRecover> TplRecoverMap;
+ typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -91,9 +91,9 @@
Db bindingDb;
Db generalDb;
- // Pointer to prepared XID journal instance
- boost::shared_ptr<JournalImpl> preparedXidStorePtr;
- PreparedRecoverMap preparedXidStoreRecoverMap;
+ // Pointer to Transaction Prepared List (TPL) journal instance
+ boost::shared_ptr<JournalImpl> tplStorePtr;
+ TplRecoverMap tplRecoverMap;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
@@ -157,11 +157,9 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverPreparedXidJournal();
- void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
- void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db,
- txn_lock_map& mappings);
+ void recoverTplStore();
+ void getTplRecoverMap(TplRecoverMap& tplMap);
+ void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
@@ -218,12 +216,12 @@
std::string getJrnlDir(const char* queueName);
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
if (!isInit) init("/tmp", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
- void chkInitPreparedXidStore();
+ void chkTplStoreInit();
// debug aid for printing XIDs that may contain non-printable chars
static std::string xid2str(const std::string xid) {
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-17 19:49:11 UTC (rev 2205)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-18 13:27:59 UTC (rev 2206)
@@ -189,7 +189,7 @@
return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
}
u_int32_t getRemainingPreparedListTxns() {
- return preparedXidStorePtr->get_open_txn_cnt();
+ return tplStorePtr->get_open_txn_cnt();
}
};
More information about the rhmessaging-commits
mailing list