Author: kpvdr
Date: 2008-08-14 13:49:28 -0400 (Thu, 14 Aug 2008)
New Revision: 2305
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
Log:
Fix for 458656 "List of xids from dtx.recover is not as expected". The call to
BdbMessageStore::collectPreparedXids() was using the prepared list obtained at recover,
but this call needs to provide a current list, no matter when called. Added ability to
ignore transactions when reading, and created a subclass of JournalImpl (called
TplJournalImpl) which implements the ignore as standard practice when calling read. This
enables reads of pending transactions from this list. Now
BdbMessageStore::collectPreparedXids() re-reads the TPL before returning the XID list.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -249,7 +249,7 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- tplStorePtr.reset(new JournalImpl("TplStore", getTplBaseDir(),
"tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
+ tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(),
"tpl", defJournalGetEventsTimeout, defJournalFlushTimeout));
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
@@ -930,77 +930,84 @@
return count;
}
-void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
+void BdbMessageStore::readTplStore()
{
+ tplRecoverMap.clear();
+ journal::txn_map& tmap = tplStorePtr->get_txn_map();
+ DataTokenImpl dtokp;
+ void* dbuff = NULL; size_t dbuffSize = 0;
+ void* xidbuff = NULL; size_t xidbuffSize = 0;
+ bool transientFlag = false;
+ bool externalFlag = false;
+ bool done = false;
+ try {
+ unsigned aio_sleep_cnt = 0;
+ while (!done) {
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+ switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff,
xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ // Every TPL record contains both data and an XID
+ assert(dbuffSize>0);
+ assert(xidbuffSize>0);
+ std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+ bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+
+ // Check transaction details; add to recover map
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+
+ // Assume commit (roll forward) in cases where only prepare has been
called - ie only enqueue record exists.
+ // Note: will apply to both 1PC and 2PC transactions.
+ bool commitFlag = true;
+
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt
== 1, commitFlag, is2PC)));
+
+ ::free(xidbuff);
+ aio_sleep_cnt = 0;
+ break;
+ }
+ case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in
BdbMessageStore::recoverTplStore()");
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ done = true;
+ break; // done with all messages. (add call in jrnl to test that _emap is
empty.)
+ default:
+ assert("Store Error: Unexpected msg state");
+ } // switch
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") +
e.what());
+ }
+}
+
+void BdbMessageStore::recoverTplStore()
+{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() +
tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid;
tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks,
tplWCacheNumPages, 0, thisHighestRid, 0);
- journal::txn_map& tmap = tplStorePtr->get_txn_map();
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- DataTokenImpl dtokp;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- bool done = false;
- try {
- unsigned aio_sleep_cnt = 0;
- while (!done) {
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
- switch (tplStorePtr->read_data_record(&dbuff, dbuffSize,
&xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
- case rhm::journal::RHM_IORES_SUCCESS: {
- // Every TPL record contains both data and an XID
- assert(dbuffSize>0);
- assert(xidbuffSize>0);
- std::string xid(static_cast<const char*>(xidbuff),
xidbuffSize);
- bool is2PC = *(static_cast<char*>(dbuff)) != 0;
+ // Load tplRecoverMap by reading the TPL store
+ readTplStore();
- // Check transaction details; add to recover map
- journal::txn_data_list txnList = tmap.get_tdata_list(xid);
- unsigned enqCnt = 0;
- unsigned deqCnt = 0;
- u_int64_t rid = 0;
-
- // Assume commit (roll forward) in cases where only prepare has been
called - ie only enqueue record exists.
- // Note: will apply to both 1PC and 2PC transactions.
- bool commitFlag = true;
-
- for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++)
{
- if (j->_enq_flag) {
- rid = j->_rid;
- enqCnt++;
- } else {
- commitFlag = j->_commit_flag;
- deqCnt++;
- }
- }
- assert(enqCnt == 1);
- assert(deqCnt <= 1);
- tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt ==
1, commitFlag, is2PC)));
-
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- break;
- }
- case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in
BdbMessageStore::recoverTplStore()");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- done = true;
- break; // done with all messages. (add call in jrnl to test that
_emap is empty.)
- default:
- assert("Store Error: Unexpected msg state");
- } // switch
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ")
+ e.what());
- }
-
tplStorePtr->recover_complete(); // start journal.
}
}
@@ -1008,7 +1015,7 @@
void BdbMessageStore::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
- recoverTplStore(tplRecoverMap);
+ recoverTplStore();
// Abort unprepaired xids and populate the locked maps
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -1022,8 +1029,12 @@
void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
{
- if (!tplStorePtr->is_ready())
- recoverTplStore(tplRecoverMap);
+ if (tplStorePtr->is_ready()) {
+ tplStorePtr->read_reset();
+ readTplStore();
+ } else {
+ recoverTplStore();
+ }
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
// Discard all txns that are to be rolled forward/back and 1PC transactions
if (!i->second.deq_flag && i->second.tpc_flag)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-08-14 17:49:28 UTC (rev 2305)
@@ -93,7 +93,7 @@
Db generalDb;
// Pointer to Transaction Prepared List (TPL) journal instance
- boost::shared_ptr<JournalImpl> tplStorePtr;
+ boost::shared_ptr<TplJournalImpl> tplStorePtr;
TplRecoverMap tplRecoverMap;
IdSequence queueIdSequence;
@@ -158,7 +158,8 @@
queue_index& index,
txn_list& locked,
message_index& prepared);
- void recoverTplStore(TplRecoverMap& tplMap);
+ void readTplStore();
+ void recoverTplStore();
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue,
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-08-14 17:49:28 UTC (rev 2305)
@@ -75,7 +75,7 @@
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
- u_int64_t lastReadRid; // rid of last read msg
+ u_int64_t lastReadRid; // rid of last read msg for loadMsgContent()
bool writeActivityFlag;
bool flushTriggeredFlag;
@@ -207,6 +207,28 @@
}
}; // class JournalImpl
+ class TplJournalImpl : public JournalImpl
+ {
+ public:
+ TplJournalImpl(const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout) :
+ JournalImpl(journalId, journalDirectory, journalBaseFilename,
getEventsTimeout, flushTimeout)
+ {}
+
+ virtual ~TplJournalImpl() {}
+
+ // Special version of read_data_record that ignores transactions - needed
when reading the TPL
+ inline journal::iores read_data_record(void** const datapp, std::size_t&
dsize,
+ void** const xidpp, std::size_t& xidsize, bool& transient,
bool& external,
+ journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize,
transient, external, dtokp, true);
+ }
+ inline void read_reset() { _rmgr.invalidate(); }
+ };
+
} // namespace bdbstore
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -310,14 +310,14 @@
#define RCINVALID_SLEEP_TIME_MS 5
iores
jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize,
- bool& transient, bool& external, data_tok* const dtokp)
+ bool& transient, bool& external, data_tok* const dtokp, bool
ignore_pending_txns)
{
check_rstatus("read_data");
unsigned cnt = 0;
iores res;
do
{
- res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp);
+ res = _rmgr.read(datapp, dsize, xidpp, xidsize, transient, external, dtokp,
ignore_pending_txns);
if (res == RHM_IORES_RCINVALID)
{
get_wr_events(); // check for outstanding write events
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -415,7 +415,8 @@
* \exception TODO
*/
iores read_data_record(void** const datapp, std::size_t& dsize, void** const
xidpp,
- std::size_t& xidsize, bool& transient, bool& external,
data_tok* const dtokp);
+ std::size_t& xidsize, bool& transient, bool& external,
data_tok* const dtokp,
+ bool ignore_pending_txns = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -90,7 +90,7 @@
iores
rmgr::read(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize,
- bool& transient, bool& external, data_tok* dtokp)
+ bool& transient, bool& external, data_tok* dtokp, bool
ignore_pending_txns)
{
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
@@ -161,8 +161,9 @@
}
catch (const jexception& e)
{
+ bool enforce_txns = !_jc->is_read_only() &&
!ignore_pending_txns;
// Block read for transactionally locked record (only when not
recovering)
- if (e.err_code() == jerrno::JERR_MAP_LOCKED &&
!_jc->is_read_only())
+ if (e.err_code() == jerrno::JERR_MAP_LOCKED && enforce_txns)
return RHM_IORES_TXPENDING;
// (Recover mode only) Ok, not in emap - now search tmap, if present
then read
@@ -181,7 +182,7 @@
is_enq = ditr->_drid == _hdr._rid;
}
}
- if (!_jc->is_read_only() && is_enq)
+ if (enforce_txns && is_enq)
return RHM_IORES_TXPENDING;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -77,7 +77,8 @@
void initialize(const rd_aio_cb rd_cb);
iores read(void** const datapp, std::size_t& dsize, void** const xidpp,
- std::size_t& xidsize, bool& transient, bool& external,
data_tok* dtokp);
+ std::size_t& xidsize, bool& transient, bool& external,
data_tok* dtokp,
+ bool ignore_pending_txns);
u_int32_t get_events(page_state state = AIO_COMPLETE);
void recover_complete();
inline bool is_valid() const {return _valid; }
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-08-14 13:44:04 UTC (rev 2304)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2008-08-14 17:49:28 UTC (rev 2305)
@@ -102,9 +102,9 @@
* <pre>
* key data
*
- * xid1 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
- * xid2 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
- * xid3 --- vector< [ rid, drid, fid, enq_flag, aio_compl ] >
+ * xid1 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
+ * xid2 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
+ * xid3 --- vector< [ rid, drid, fid, enq_flag, commit_flag, aio_compl ] >
* ...
* </pre>
*/