Author: kpvdr
Date: 2010-06-15 14:47:09 -0400 (Tue, 15 Jun 2010)
New Revision: 4027
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Refactor to remove exceptions from tmap execution path
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
This file is part of the Qpid async store library msgstore.so.
@@ -216,21 +216,14 @@
if (prep_tx_list_ptr)
{
for (msgstore::PreparedTransaction::list::iterator i =
prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
- try {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
- }
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if
xid not found
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
}
}
- catch (const jexception& e) {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
}
}
std::ostringstream oss2;
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
This file is part of the Qpid async store library msgstore.so.
@@ -971,7 +971,7 @@
} else {
// Enqueue and/or dequeue tx
journal::txn_map& tmap = jc->get_txn_map();
- journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid); //
txnList will be empty if xid not found
bool enq = false;
bool deq = false;
for (journal::tdl_itr j = txnList.begin();
j<txnList.end(); j++) {
@@ -1081,13 +1081,8 @@
bool is2PC = *(static_cast<char*>(dbuff)) != 0;
// Check transaction details; add to recover map
- // NOTE: There is a small but finite probability that the xid read above
may have been removed by
- // another thread on one of the active queues by the time the
get_tdata_list() call below is made.
- // Since reading the TPL is not considered a high-speed operation and is
used for recovery and other
- // infrequent uses, the following try-catch will work as well as
attempting to lock down the
- // entire transaction map for this operation - but with less complexity.
- try {
- journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+ journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList
will be empty if xid not found
+ if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
u_int64_t rid = 0;
@@ -1109,12 +1104,6 @@
assert(deqCnt <= 1);
tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid,
deqCnt == 1, commitFlag, is2PC)));
}
- catch (const journal::jexception& e) {
- ::free(xidbuff);
- aio_sleep_cnt = 0;
- if (e.err_code() == journal::jerrno::JERR_MAP_NOTFOUND) break; //
ignore this xid; move on
- throw;
- }
::free(xidbuff);
aio_sleep_cnt = 0;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -623,7 +623,7 @@
std::find(prep_txn_list_ptr->begin(),
prep_txn_list_ptr->end(), *itr);
if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(*itr); // tdl will be
empty if xid not found
// Unlock any affected enqueues in emap
for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
{
@@ -691,7 +691,13 @@
assert(xidp != 0);
std::string xid((char*)xidp, er.xid_size());
_tmap.insert_txn_data(xid, txn_data(h._rid, 0, start_fid,
true));
- _tmap.set_aio_compl(xid, h._rid);
+ if (_tmap.set_aio_compl(xid, h._rid)) // xid or rid not found
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl:
txn_enq xid=\"" << xid;
+ oss << "\" rid=0x" << h._rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(),
"jcntl", "rcvr_get_next_record");
+ }
std::free(xidp);
}
else
@@ -718,7 +724,13 @@
std::string xid((char*)xidp, dr.xid_size());
_tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(),
start_fid, false,
dr.is_txn_coml_commit()));
- _tmap.set_aio_compl(xid, dr.rid());
+ if (_tmap.set_aio_compl(xid, dr.rid())) // xid or rid not found
+ {
+ std::ostringstream oss;
+ oss << std::hex << "_tmap.set_aio_compl: txn_deq
xid=\"" << xid;
+ oss << "\" rid=0x" << dr.rid();
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(),
"jcntl", "rcvr_get_next_record");
+ }
std::free(xidp);
}
else
@@ -746,7 +758,7 @@
std::string xid((char*)xidp, ar.xid_size());
try
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be
empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag)
@@ -779,7 +791,7 @@
std::string xid((char*)xidp, cr.xid_size());
try
{
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be
empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -66,6 +66,12 @@
_pfid_txn_cnt.set_size(num_jfiles);
}
+u_int32_t
+txn_map::get_txn_pfid_cnt(const u_int16_t pfid) const
+{
+ return _pfid_txn_cnt.cnt(pfid);
+}
+
bool
txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
{
@@ -98,11 +104,7 @@
{
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"get_tdata_list_nolock");
- }
+ return _empty_data_list;
return itr->second;
}
@@ -112,11 +114,7 @@
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"get_remove_tdata_list");
- }
+ return _empty_data_list;
txn_data_list list = itr->second;
_map.erase(itr);
for (tdl_itr i=list.begin(); i!=list.end(); i++)
@@ -129,26 +127,22 @@
{
slock s(_mutex);
xmap_itr itr= _map.find(xid);
- if (itr == _map.end()) // not found in map
- return false;
- return true;
+ return itr != _map.end();
}
u_int32_t
-txn_map::get_rid_count(const std::string& xid)
+txn_map::enq_cnt()
{
- slock s(_mutex);
- xmap_itr itr = _map.find(xid);
- if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"get_rid_count");
- }
- return itr->second.size();
+ return cnt(true);
}
u_int32_t
+txn_map::deq_cnt()
+{
+ return cnt(true);
+}
+
+u_int32_t
txn_map::cnt(const bool enq_flag)
{
slock s(_mutex);
@@ -164,33 +158,13 @@
return c;
}
-u_int32_t
-txn_map::cnt(const std::string& xid, const bool enq_flag)
-{
- slock s(_mutex);
- u_int32_t c = 0;
- xmap_itr i = _map.find(xid);
- if (i == _map.end()) // not found in map
- return 0;
- for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
- {
- if (j->_enq_flag == enq_flag)
- c++;
- }
- return c;
-}
-
-bool
+int8_t
txn_map::is_txn_synced(const std::string& xid)
{
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid);
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"is_txn_synced");
- }
+ return -1;
bool is_synced = true;
for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
@@ -200,43 +174,30 @@
break;
}
}
- return is_synced;
+ return is_synced ? 1 : 0;
}
-bool
+int8_t
txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
{
- bool ok = true;
- bool found = false;
+ slock s(_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // xid not found in map
+ return -1;
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
- slock s(_mutex);
- xmap_itr itr = _map.find(xid);
- if (itr == _map.end()) // not found in map
- ok = false;
- else
+ if (litr->_rid == rid)
{
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end();
litr++)
- {
- if (litr->_rid == rid)
- {
- found = true;
- litr->_aio_compl = true;
- break;
- }
- }
+ litr->_aio_compl = true;
+ return 0; // rid found
}
}
- if (ok && !found)
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid) <<
" rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"set_aio_compl");
- }
- return ok;
+ // xid present, but rid not found
+ return -2;
}
-const txn_data&
-txn_map::get_data(const std::string& xid, const u_int64_t rid)
+bool
+txn_map::data_exists(const std::string& xid, const u_int64_t rid)
{
bool found = false;
{
@@ -248,14 +209,8 @@
found = itr->_rid == rid;
itr++;
}
- if (!found)
- {
- std::ostringstream oss;
- oss << std::hex << "xid=" << xid_format(xid)
<< " rid=0x" << rid;
- throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
"get_data");
- }
- return *itr;
}
+ return found;
}
bool
@@ -290,17 +245,5 @@
}
}
-// static fn
-std::string
-txn_map::xid_format(const std::string& xid)
-{
- if (xid.size() < 100)
- return xid;
- std::ostringstream oss;
- oss << "\"" << xid.substr(0, 20) << " ...
" << xid.substr(xid.size() - 20, 20);
- oss << "\" [size: " << xid.size() << "]";
- return oss.str();
-}
-
} // namespace journal
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -119,37 +119,30 @@
xmap _map;
smutex _mutex;
arr_cnt _pfid_txn_cnt;
+ const txn_data_list _empty_data_list;
public:
txn_map();
virtual ~txn_map();
void set_num_jfiles(const u_int16_t num_jfiles);
- inline u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const
- { return _pfid_txn_cnt.cnt(pfid); };
-
+ u_int32_t get_txn_pfid_cnt(const u_int16_t pfid) const;
bool insert_txn_data(const std::string& xid, const txn_data& td);
const txn_data_list get_tdata_list(const std::string& xid);
const txn_data_list get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
- u_int32_t get_rid_count(const std::string& xid);
- inline u_int32_t enq_cnt() { return cnt(true); }
- inline u_int32_t enq_cnt(const std::string& xid) { return cnt(xid, true); }
- inline u_int32_t deq_cnt() { return cnt(true); }
- inline u_int32_t deq_cnt(const std::string& xid) { return cnt(xid, false); }
- bool is_txn_synced(const std::string& xid);
- bool set_aio_compl(const std::string& xid, const u_int64_t rid);
- const txn_data& get_data(const std::string& xid, const u_int64_t rid);
+ u_int32_t enq_cnt();
+ u_int32_t deq_cnt();
+ int8_t is_txn_synced(const std::string& xid); // -1=xid not found; 0=not
synced; 1=synced
+ int8_t set_aio_compl(const std::string& xid, const u_int64_t rid); // -2=rid
not found; -1=xid not found; 0=done
+ bool data_exists(const std::string& xid, const u_int64_t rid);
bool is_enq(const u_int64_t rid);
inline void clear() { _map.clear(); }
inline bool empty() const { return _map.empty(); }
- inline u_int32_t size() const { return u_int32_t(_map.size()); }
+ inline size_t size() const { return _map.size(); }
void xid_list(std::vector<std::string>& xv);
private:
u_int32_t cnt(const bool enq_flag);
- u_int32_t cnt(const std::string& xid, const bool enq_flag);
- static std::string xid_format(const std::string& xid);
-
const txn_data_list get_tdata_list_nolock(const std::string& xid);
};
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-06-14 21:51:43 UTC (rev 4026)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2010-06-15 18:47:09 UTC (rev 4027)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009, 2010 Red Hat, Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -372,7 +372,7 @@
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if
xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
try
@@ -469,7 +469,7 @@
// Delete this txn from tmap, process records into emap
std::string xid((char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if
xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
@@ -684,6 +684,8 @@
tot_data_toks++;
dtokp->set_wstate(data_tok::ENQ);
if (dtokp->has_xid())
+ // Ignoring return value here. A non-zero return can signify
that the transaction
+ // has committed or aborted, and which was completed prior to
the aio returning.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
@@ -691,6 +693,7 @@
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
if (dtokp->has_xid())
+ // Ignoring return value - see note above.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::ABORT_SUBM:
@@ -772,14 +775,7 @@
bool
wmgr::is_txn_synced(const std::string& xid)
{
- bool is_synced = true;
- // Check for outstanding enqueues/dequeues
- try { is_synced = _tmap.is_txn_synced(xid); }
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
- }
- if (!is_synced)
+ if (_tmap.is_txn_synced(xid) == 0) // not synced
return false;
// Check for outstanding commit/aborts
std::set<std::string>::iterator it = _txn_pending_set.find(xid);
@@ -898,16 +894,7 @@
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw;
if (xid.size())
- try
- {
- _tmap.get_data(xid, drid); // not in emap, try tmap
- found = true;
- }
- catch (const jexception& e)
- {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
+ found = _tmap.data_exists(xid, drid);
}
if (!found)
{