Author: kpvdr
Date: 2007-10-09 12:01:15 -0400 (Tue, 09 Oct 2007)
New Revision: 979
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/janalyze.py
Log:
Bugfix in rmgr::consume_xid_rec(); Added boolean transaction lock to enq-map.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/Makefile.am 2007-10-09 16:01:15 UTC (rev 979)
@@ -1,4 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) -pthread
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) -DRHM_CLEAN -pthread
lib_LTLIBRARIES = libbdbstore.la
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -55,9 +55,16 @@
void
enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception)
{
+ insert_fid(rid, fid, false);
+}
+
+void
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception)
+{
+ std::pair<u_int16_t, bool> rec(fid, locked);
pthread_mutex_lock(&_mutex);
- std::pair<std::map<u_int64_t, u_int16_t>::iterator, bool> ret =
- _map.insert(std::pair<u_int64_t, u_int16_t>(rid, fid));
+ std::pair<std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator,
bool> ret =
+ _map.insert(std::pair<u_int64_t, std::pair<u_int16_t, bool>
>(rid, rec));
pthread_mutex_unlock(&_mutex);
if (ret.second == false)
{
@@ -72,43 +79,102 @@
enq_map::get_fid(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
+ std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
pthread_mutex_unlock(&_mutex);
- if (itr == _map.end())
+ if (itr == _map.end()) // not found in map
{
std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "rid=0x" << std::setw(16) << rid;
+ ss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
}
- return itr->second;
+ if (itr->second.second) // locked
+ {
+ std::stringstream ss;
+ ss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map",
"get_fid");
+ }
+ return itr->second.first;
}
const u_int16_t
enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
- if (itr == _map.end())
+ std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ if (itr == _map.end()) // not found in map
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "rid=0x" << std::setw(16) << rid;
+ ss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
- u_int16_t fid = itr->second;
+ if (itr->second.second) // locked
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map",
"get_remove_fid");
+ }
+ u_int16_t fid = itr->second.first;
_map.erase(itr);
pthread_mutex_unlock(&_mutex);
return fid;
}
void
+enq_map::lock(const u_int64_t rid)
+{
+ pthread_mutex_lock(&_mutex);
+ std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ if (itr == _map.end()) // not found in map
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ }
+ itr->second.second = true;
+ pthread_mutex_unlock(&_mutex);
+}
+
+void
+enq_map::unlock(const u_int64_t rid)
+{
+ pthread_mutex_lock(&_mutex);
+ std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ if (itr == _map.end()) // not found in map
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ }
+ itr->second.second = false;
+ pthread_mutex_unlock(&_mutex);
+}
+
+const bool
+enq_map::is_locked(const u_int64_t rid)
+{
+ pthread_mutex_lock(&_mutex);
+ std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ pthread_mutex_unlock(&_mutex);
+ if (itr == _map.end()) // not found in map
+ {
+ std::stringstream ss;
+ ss << std::hex << "rid=0x" << rid;
+ throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
+ }
+ return itr->second.second;
+}
+
+void
enq_map::rid_list(std::vector<u_int64_t>& rv)
{
rv.clear();
pthread_mutex_lock(&_mutex);
- for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr !=
_map.end(); itr++)
+ for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.begin();
+ itr != _map.end(); itr++)
rv.push_back(itr->first);
pthread_mutex_unlock(&_mutex);
}
@@ -118,8 +184,9 @@
{
fv.clear();
pthread_mutex_lock(&_mutex);
- for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr !=
_map.end(); itr++)
- fv.push_back(itr->second);
+ for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.begin();
+ itr != _map.end(); itr++)
+ fv.push_back(itr->second.first);
pthread_mutex_unlock(&_mutex);
}
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -53,13 +53,13 @@
/**
* \class enq_map
- * \brief Class for storing the file id (fid) for each enqueued data block using
- * the record id (rid) as a key.
+ * \brief Class for storing the file id (fid) and a transaction locked flag for each
enqueued
+ * data block using the record id (rid) as a key.
*/
class enq_map
{
private:
- std::map<u_int64_t, u_int16_t> _map;
+ std::map<u_int64_t, std::pair<u_int16_t, bool> > _map;
pthread_mutex_t _mutex;
public:
@@ -67,8 +67,12 @@
~enq_map();
void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
+ void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception);
const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
+ void lock(const u_int64_t rid);
+ void unlock(const u_int64_t rid);
+ const bool is_locked(const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -108,6 +108,7 @@
// class enq_map
const u_int32_t jerrno::JERR_EMAP_DUPLICATE = 0x0b00;
const u_int32_t jerrno::JERR_EMAP_NOTFOUND = 0x0b01;
+const u_int32_t jerrno::JERR_EMAP_LOCKED = 0x0b02;
// class jinf
const u_int32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00;
@@ -206,6 +207,8 @@
"Attempted to insert enqueue record using duplicate key.");
_err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: "
"Key not found in enqueue map.");
+ _err_map[JERR_EMAP_LOCKED] = std::string("JERR_EMAP_LOCKED: "
+ "Record ID locked by a pending transaction.");
// class jinf
_err_map[JERR_JINF_CVALIDFAIL] = std::string("JERR_JINF_CVALIDFAIL: "
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -125,6 +125,7 @@
// class enq_map
static const u_int32_t JERR_EMAP_DUPLICATE; ///< Attempted to insert using
duplicate key
static const u_int32_t JERR_EMAP_NOTFOUND; ///< Key not found in map
+ static const u_int32_t JERR_EMAP_LOCKED; ///< rid locked by pending
txn
// class jinf
static const u_int32_t JERR_JINF_CVALIDFAIL; ///< Compatibility validation
failure
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -248,6 +248,7 @@
// Read header, determine next record type
while (true)
{
+//std::string s;
//std::cout << " [f pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << " status:" <<
_rrfc.file_handle()->status_str(s) << "]" << std::flush;
if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
{
@@ -292,7 +293,7 @@
#endif
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
-//std::cout << " $" << std::flush;
+//std::cout << "e" << std::flush;
if (dtokp->rid())
{
if (_hdr._rid != dtokp->rid())
@@ -329,7 +330,7 @@
return res;
}
else // skip this record, it is already dequeued
-//{ std::cout << " %" << std::flush;
+//{ std::cout << "d" << std::flush;
consume_xid_rec(_hdr, rptr, dtokp);
//}
break;
@@ -510,7 +511,7 @@
{
enq_hdr ehdr;
::memcpy(&ehdr, rptr, sizeof(enq_hdr));
- dtokp->set_dsize(ehdr._xidsize + sizeof(enq_hdr) + sizeof(rec_tail));
+ dtokp->set_dsize(ehdr._xidsize + ehdr._dsize + sizeof(enq_hdr) +
sizeof(rec_tail));
}
else if (h._magic == RHM_JDAT_DEQ_MAGIC)
{
@@ -579,8 +580,6 @@
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
dtokp->set_rstate(data_tok::SKIP_PART);
- // Use data_tok::dblocks_proc field to save how many skip bloks still to
go...
-// dtokp->set_dblocks_read(dsize_dblks - tot_dblk_cnt);
//std::cout << " w]" << std::flush;
return RHM_IORES_AIO_WAIT;
}
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-09 16:01:15 UTC (rev 979)
@@ -51,7 +51,7 @@
CPPUNIT_TEST(RecoverReadTest);
CPPUNIT_TEST(RecoveredReadTest);
CPPUNIT_TEST(RecoveredDequeueTest);
-// CPPUNIT_TEST(ComplexRecoveryTest1);
+ CPPUNIT_TEST(ComplexRecoveryTest1);
CPPUNIT_TEST(EncodeTest_000);
CPPUNIT_TEST(EncodeTest_001);
CPPUNIT_TEST(EncodeTest_002);
Modified: store/trunk/cpp/tests/jrnl/janalyze.py
===================================================================
--- store/trunk/cpp/tests/jrnl/janalyze.py 2007-10-09 15:55:12 UTC (rev 978)
+++ store/trunk/cpp/tests/jrnl/janalyze.py 2007-10-09 16:01:15 UTC (rev 979)
@@ -25,6 +25,7 @@
from struct import unpack, calcsize
from time import gmtime, strftime
+base_file_name = 'test'
dblk_size = 128
sblk_size = 4 * dblk_size
file_size = (3072 + 1) * sblk_size
@@ -359,8 +360,6 @@
return ''
def __str__(self):
-# if self.enq_tail == None:
-# return '%s %s [no tail]' % (Hdr.__str__(self), self.dsize, dstr)
return '%s %s%s %s %s' % (Hdr.__str__(self), print_xid(self.xidsize,
self.xid), print_data(self.dsize, self.data), self.enq_tail, self.print_flags())
@@ -510,7 +509,7 @@
self.last_file = self.file_num == self.file_start - 1
if self.file_num < 0 or self.file_num >= num_files:
raise Exception('Bad file number %d' % self.file_num)
- file_name = 'jdata/test.%04d.jdat' % self.file_num
+ file_name = 'jdata/' + base_file_name + '.%04d.jdat' %
self.file_num
self.f = open(file_name)
self.fhdr = load(self.f, Hdr)
if seek_flag and self.f.tell() != self.fro:
@@ -558,7 +557,7 @@
tss = ''
print 'Analyzing journal files:'
for i in range(0, num_files):
- file_name = 'jdata/test.%04d.jdat' % i
+ file_name = 'jdata/' + base_file_name + '.%04d.jdat' % i
f = open(file_name)
fhdr = load(f, Hdr)
if fhdr.empty():