Author: kpvdr
Date: 2007-10-12 11:08:10 -0400 (Fri, 12 Oct 2007)
New Revision: 1021
Modified:
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
Log:
Corrected is_txn_synced() to take into account the commit/abort record as well as the
enqueues and dequeues
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-12 15:08:10 UTC (rev 1021)
@@ -59,7 +59,7 @@
}
void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception)
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked) throw
(jexception)
{
fid_lock_pair rec(fid, locked);
pthread_mutex_lock(&_mutex);
@@ -95,7 +95,7 @@
}
const u_int16_t
-enq_map::get_remove_fid(const u_int64_t rid, const bool tx_flag) throw (jexception)
+enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag) throw (jexception)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -106,7 +106,7 @@
ss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
- if (itr->second.second && !tx_flag) // locked
+ if (itr->second.second && !txn_flag) // locked, but not a commit/abort
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-12 15:08:10 UTC (rev 1021)
@@ -72,9 +72,11 @@
~enq_map();
void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
- void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception);
+ void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
+ throw (jexception);
const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
- const u_int16_t get_remove_fid(const u_int64_t rid, const bool tx_flag=false)
throw (jexception);
+ const u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false)
+ throw (jexception);
void lock(const u_int64_t rid) throw (jexception);
void unlock(const u_int64_t rid) throw (jexception);
const bool is_locked(const u_int64_t rid) throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-12 15:08:10 UTC (rev 1021)
@@ -257,7 +257,7 @@
const bool
jcntl::is_txn_synced(const std::string& xid) throw (jexception)
{
- return _tmap.is_txn_synced(xid);
+ return _wmgr.is_txn_synced(xid);
}
const u_int32_t
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 15:08:10 UTC (rev 1021)
@@ -54,7 +54,8 @@
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
- _commit_busy(false)
+ _commit_busy(false),
+ _txn_pending_set()
{}
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
std::deque<data_tok*>* const dtokl,
@@ -70,7 +71,8 @@
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
- _commit_busy(false)
+ _commit_busy(false),
+ _txn_pending_set()
{}
wmgr::~wmgr()
@@ -438,6 +440,13 @@
if (itr->_enq_flag)
_wrfc.decr_enqcnt(itr->_fid);
}
+ std::pair<std::set<std::string>::iterator, bool> res =
_txn_pending_set.insert(xid);
+ if (!res.second)
+ {
+ std::stringstream ss;
+ ss << std::hex << "_txn_pending_set: xid=\""
<< xid << "\"";
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, ss.str(), "wmgr",
"abort");
+ }
#endif
done = true;
@@ -573,6 +582,13 @@
_wrfc.decr_enqcnt(fid);
}
}
+ std::pair<std::set<std::string>::iterator, bool> res =
_txn_pending_set.insert(xid);
+ if (!res.second)
+ {
+ std::stringstream ss;
+ ss << std::hex << "_txn_pending_set: xid=\""
<< xid << "\"";
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, ss.str(), "wmgr",
"commit");
+ }
#endif
done = true;
@@ -747,23 +763,44 @@
for (u_int32_t k=0; k<s; k++)
{
data_tok* dtp = pcbp->_pdtokl->at(k);
+ std::set<std::string>::iterator it;
switch (dtp->wstate())
{
case data_tok::ENQ_SUBM:
assert(dtp->wstate() == data_tok::ENQ_SUBM);
dtp->set_wstate(data_tok::ENQ);
+ if (dtp->has_xid())
+ _tmap.set_aio_compl(dtp->xid(), dtp->rid());
break;
case data_tok::DEQ_SUBM:
assert(dtp->wstate() == data_tok::DEQ_SUBM);
dtp->set_wstate(data_tok::DEQ);
+ if (dtp->has_xid())
+ _tmap.set_aio_compl(dtp->xid(), dtp->rid());
break;
case data_tok::ABORT_SUBM:
assert(dtp->wstate() == data_tok::ABORT_SUBM);
dtp->set_wstate(data_tok::ABORTED);
+ it = _txn_pending_set.find(dtp->xid());
+ if (it == _txn_pending_set.end())
+ {
+ std::stringstream ss;
+ ss << std::hex << "_txn_pending_set: abort
xid=\"" << dtp->xid() << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(),
"wmgr", "get_events");
+ }
+ _txn_pending_set.erase(it);
break;
case data_tok::COMMIT_SUBM:
assert(dtp->wstate() == data_tok::COMMIT_SUBM);
dtp->set_wstate(data_tok::COMMITTED);
+ it = _txn_pending_set.find(dtp->xid());
+ if (it == _txn_pending_set.end())
+ {
+ std::stringstream ss;
+ ss << std::hex << "_txn_pending_set: commit
xid=\"" << dtp->xid() << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(),
"wmgr", "get_events");
+ }
+ _txn_pending_set.erase(it);
break;
default:
std::stringstream ss;
@@ -771,8 +808,6 @@
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(),
"wmgr",
"get_events");
}
- if (dtp->has_xid())
- _tmap.set_aio_compl(dtp->xid(), dtp->rid());
_dtokl->push_back(dtp);
}
tot_data_toks += s;
@@ -805,6 +840,24 @@
return tot_data_toks;
}
+const bool
+wmgr::is_txn_synced(const std::string& xid) throw (jexception)
+{
+ bool is_synced = true;
+ // Check for outstanding enqueues/dequeues
+ try { is_synced = _tmap.is_txn_synced(xid); }
+ catch (jexception e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw e;
+ }
+ if (!is_synced)
+ return false;
+ // Check for outstanding commit/aborts
+ std::set<std::string>::iterator it = _txn_pending_set.find(xid);
+ return it != _txn_pending_set.end();
+}
+
void
wmgr::initialize() throw (jexception)
{
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-12 14:07:24 UTC (rev 1020)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-12 15:08:10 UTC (rev 1021)
@@ -43,6 +43,7 @@
#include <jrnl/pmgr.hpp>
#include <jrnl/wrfc.hpp>
+#include <set>
namespace rhm
{
@@ -90,6 +91,7 @@
enq_rec _enq_rec; ///< Enqueue record used for
encoding/decoding
deq_rec _deq_rec; ///< Dequeue record used for
encoding/decoding
txn_rec _txn_rec; ///< Transaction record used for
encoding/decoding
+ std::set<std::string> _txn_pending_set; ///< Set containing xids of
pending commits/aborts
aio_cb _cb; ///< Callback function pointer for AIO events
public:
@@ -112,6 +114,7 @@
throw (jexception);
const iores flush();
const u_int32_t get_events(page_state state) throw (jexception);
+ const bool is_txn_synced(const std::string& xid) throw (jexception);
private:
void initialize() throw (jexception);