Author: kpvdr
Date: 2009-01-21 09:58:32 -0500 (Wed, 21 Jan 2009)
New Revision: 3059
Modified:
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp
Log:
Further preparation for auto-expand: class emap - changed names using fid to pfid.
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -43,7 +43,7 @@
enq_map::enq_map():
_map(),
- _fid_enq_cnt()
+ _pfid_enq_cnt()
{
pthread_mutex_init(&_mutex, 0);
}
@@ -56,20 +56,20 @@
void
enq_map::set_num_jfiles(const u_int16_t num_jfiles)
{
- _fid_enq_cnt.set_size(num_jfiles);
+ _pfid_enq_cnt.set_size(num_jfiles);
}
void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid)
+enq_map::insert_pfid(const u_int64_t rid, const u_int16_t pfid)
{
- insert_fid(rid, fid, false);
+ insert_pfid(rid, pfid, false);
}
void
-enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
+enq_map::insert_pfid(const u_int64_t rid, const u_int16_t pfid, const bool locked)
{
std::pair<emap_itr, bool> ret;
- fid_lock_pair rec(fid, locked);
+ emap_data_struct rec(pfid, locked);
{
slock s(&_mutex);
ret = _map.insert(emap_param(rid, rec));
@@ -77,14 +77,14 @@
if (ret.second == false)
{
std::ostringstream oss;
- oss << std::hex << "rid=0x" << rid << "
fid=0x" << fid;
+ oss << std::hex << "rid=0x" << rid << "
pfid=0x" << pfid;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "enq_map",
"insert");
}
- _fid_enq_cnt.incr(fid);
+ _pfid_enq_cnt.incr(pfid);
}
u_int16_t
-enq_map::get_fid(const u_int64_t rid)
+enq_map::get_pfid(const u_int64_t rid)
{
emap_itr itr;
{
@@ -97,17 +97,17 @@
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map",
"get_fid");
}
- if (itr->second.second) // locked
+ if (itr->second._lock)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "enq_map",
"get_fid");
}
- return itr->second.first;
+ return itr->second._pfid;
}
u_int16_t
-enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag)
+enq_map::get_remove_pfid(const u_int64_t rid, const bool txn_flag)
{
slock s(&_mutex);
emap_itr itr = _map.find(rid);
@@ -117,16 +117,16 @@
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map",
"get_remove_fid");
}
- if (itr->second.second && !txn_flag) // locked, but not a commit/abort
+ if (itr->second._lock && !txn_flag) // locked, but not a commit/abort
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "enq_map",
"get_remove_fid");
}
- u_int16_t fid = itr->second.first;
+ u_int16_t pfid = itr->second._pfid;
_map.erase(itr);
- _fid_enq_cnt.decr(fid);
- return fid;
+ _pfid_enq_cnt.decr(pfid);
+ return pfid;
}
bool
@@ -136,7 +136,7 @@
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
return false;
- if (!ignore_lock && itr->second.second) // locked
+ if (!ignore_lock && itr->second._lock) // locked
return false;
return true;
}
@@ -152,7 +152,7 @@
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map",
"get_remove_fid");
}
- itr->second.second = true;
+ itr->second._lock = true;
}
void
@@ -166,7 +166,7 @@
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map",
"get_remove_fid");
}
- itr->second.second = false;
+ itr->second._lock = false;
}
bool
@@ -183,7 +183,7 @@
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map",
"get_fid");
}
- return itr->second.second;
+ return itr->second._lock;
}
void
@@ -204,7 +204,7 @@
{
slock s(&_mutex);
for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
- fv.push_back(itr->second.first);
+ fv.push_back(itr->second._pfid);
}
}
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -54,47 +54,53 @@
/**
* \class enq_map
- * \brief Class for storing the file id (fid) and a transaction locked flag for each
enqueued
+ * \brief Class for storing the physical file id (pfid) and a transaction locked flag
for each enqueued
* data block using the record id (rid) as a key. This is the primary mechanism
for
- * deterimining the enqueue low water mark: if an fid exists in this map, then
there is
+ * deterimining the enqueue low water mark: if a pfid exists in this map, then
there is
* at least one still-enqueued record in that file. (The transaction map must also
be
* clear, however.)
*
- * Map keying rids against fid and lock status. As records ar enqueued, they are added
to this
+ * Map rids against pfid and lock status. As records are enqueued, they are added to
this
* map, and as they are dequeued, they are removed. An enqueue is locked when a
transactional
- * dequeue is pending that has not been either committed or aborted.
+ * dequeue is pending that has been neither committed nor aborted.
* <pre>
* key data
*
- * rid1 --- [ fid, txn_lock ]
- * rid2 --- [ fid, txn_lock ]
- * rid3 --- [ fid, txn_lock ]
+ * rid1 --- [ pfid, txn_lock ]
+ * rid2 --- [ pfid, txn_lock ]
+ * rid3 --- [ pfid, txn_lock ]
* ...
* </pre>
*/
class enq_map
{
private:
- typedef std::pair<u_int16_t, bool> fid_lock_pair;
- typedef std::pair<u_int64_t, fid_lock_pair> emap_param;
- typedef std::map<u_int64_t, fid_lock_pair> emap;
+
+ struct emap_data_struct
+ {
+ u_int16_t _pfid;
+ bool _lock;
+ emap_data_struct(const u_int16_t pfid, const bool lock) : _pfid(pfid),
_lock(lock) {}
+ };
+ typedef std::pair<u_int64_t, emap_data_struct> emap_param;
+ typedef std::map<u_int64_t, emap_data_struct> emap;
typedef emap::iterator emap_itr;
emap _map;
pthread_mutex_t _mutex;
- arr_cnt _fid_enq_cnt;
+ arr_cnt _pfid_enq_cnt;
public:
enq_map();
virtual ~enq_map();
void set_num_jfiles(const u_int16_t num_jfiles);
- inline u_int32_t get_enq_cnt(const u_int16_t fid) const { return
_fid_enq_cnt.cnt(fid); };
+ inline u_int32_t get_enq_cnt(const u_int16_t pfid) const { return
_pfid_enq_cnt.cnt(pfid); };
- void insert_fid(const u_int64_t rid, const u_int16_t fid);
- void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
- u_int16_t get_fid(const u_int64_t rid);
- u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
+ void insert_pfid(const u_int64_t rid, const u_int16_t pfid);
+ void insert_pfid(const u_int64_t rid, const u_int16_t pfid, const bool locked);
+ u_int16_t get_pfid(const u_int64_t rid);
+ u_int16_t get_remove_pfid(const u_int64_t rid, const bool txn_flag = false);
bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
void lock(const u_int64_t rid);
void unlock(const u_int64_t rid);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -696,7 +696,7 @@
std::free(xidp);
}
else
- _emap.insert_fid(h._rid, start_fid);
+ _emap.insert_pfid(h._rid, start_fid);
}
}
break;
@@ -726,7 +726,7 @@
{
try
{
- u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid(), true);
+ u_int16_t enq_fid = _emap.get_remove_pfid(dr.deq_rid(), true);
rd._enq_cnt_list[enq_fid]--;
}
catch(const jexception& e)
@@ -784,12 +784,12 @@
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
+ _emap.insert_pfid(itr->_rid, itr->_fid);
else // txn dequeue
{
try
{
- u_int16_t fid = _emap.get_remove_fid(itr->_drid,
true);
+ u_int16_t fid = _emap.get_remove_pfid(itr->_drid,
true);
rd._enq_cnt_list[fid]--;
}
catch (const jexception& e)
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -156,7 +156,7 @@
bool is_enq = false;
try
{
- fid = _emap.get_fid(_hdr._rid); // If locked, will throw
JERR_MAP_LOCKED
+ fid = _emap.get_pfid(_hdr._rid); // If locked, will throw
JERR_MAP_LOCKED
is_enq = true;
}
catch (const jexception& e)
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -195,7 +195,7 @@
_tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
}
else
- _emap.insert_fid(rid, dtokp->fid());
+ _emap.insert_pfid(rid, dtokp->fid());
done = true;
}
@@ -334,7 +334,7 @@
}
else
{
- u_int16_t fid = _emap.get_remove_fid(dtokp->dequeue_rid());
+ u_int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid());
_wrfc.decr_enqcnt(fid);
}
@@ -597,10 +597,10 @@
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
- _emap.insert_fid(itr->_rid, itr->_fid);
+ _emap.insert_pfid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+ u_int16_t fid = _emap.get_remove_pfid(itr->_drid, true);
_wrfc.decr_enqcnt(fid);
}
}
@@ -1004,7 +1004,7 @@
bool found = false;
try
{
- _emap.get_fid(drid);
+ _emap.get_pfid(drid);
found = true;
}
catch(const jexception& e)
Modified: store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp 2009-01-20 17:42:23 UTC (rev 3058)
+++ store/trunk/cpp/tests/jrnl/_ut_enq_map.cpp 2009-01-21 14:58:32 UTC (rev 3059)
@@ -53,17 +53,17 @@
QPID_AUTO_TEST_CASE(insert_get)
{
cout << test_filename << ".insert_get: " << flush;
- u_int16_t fid;
+ u_int16_t pfid;
u_int64_t rid;
- u_int16_t fid_start = 0x2000U;
+ u_int16_t pfid_start = 0x2000U;
u_int64_t rid_begin = 0xffffffff00000000ULL;
u_int64_t rid_end = 0xffffffff00000200ULL;
// insert with no dups
u_int64_t rid_incr_1 = 4ULL;
enq_map e2;
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_1, fid++)
- e2.insert_fid(rid, fid);
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1,
pfid++)
+ e2.insert_pfid(rid, pfid);
BOOST_CHECK(!e2.empty());
BOOST_CHECK_EQUAL(e2.size(), u_int32_t(128));
@@ -74,9 +74,9 @@
BOOST_CHECK_EQUAL(e2.is_enqueued(rid), (rid%rid_incr_1 ? false : true));
try
{
- u_int16_t exp_fid = fid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
- u_int16_t ret_fid = e2.get_fid(rid);
- BOOST_CHECK_EQUAL(ret_fid, exp_fid);
+ u_int16_t exp_pfid = pfid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
+ u_int16_t ret_fid = e2.get_pfid(rid);
+ BOOST_CHECK_EQUAL(ret_fid, exp_pfid);
BOOST_CHECK(rid%rid_incr_1 == 0);
}
catch (const jexception& e)
@@ -85,15 +85,15 @@
BOOST_CHECK(rid%rid_incr_1);
}
if ((rid + rid_incr_2)%(8 * rid_incr_2) == 0)
- fid++;
+ pfid++;
}
// insert with dups
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_2, fid++)
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_2,
pfid++)
{
try
{
- e2.insert_fid(rid, fid);
+ e2.insert_pfid(rid, pfid);
BOOST_CHECK(rid%rid_incr_1);
}
catch (const jexception& e)
@@ -112,27 +112,27 @@
QPID_AUTO_TEST_CASE(get_remove)
{
cout << test_filename << ".get_remove: " << flush;
- u_int16_t fid;
+ u_int16_t pfid;
u_int64_t rid;
- u_int16_t fid_start = 0x3000U;
+ u_int16_t pfid_start = 0x3000U;
u_int64_t rid_begin = 0xeeeeeeee00000000ULL;
u_int64_t rid_end = 0xeeeeeeee00000200ULL;
u_int64_t rid_incr_1 = 4ULL;
u_int64_t num_incr_1 = (rid_end - rid_begin)/rid_incr_1;
enq_map e3;
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_1, fid++)
- e3.insert_fid(rid, fid);
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1,
pfid++)
+ e3.insert_pfid(rid, pfid);
BOOST_CHECK_EQUAL(e3.size(), num_incr_1);
u_int64_t rid_incr_2 = 6ULL;
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_2, fid++)
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_2,
pfid++)
{
try
{
- u_int16_t exp_fid = fid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
- u_int16_t ret_fid = e3.get_remove_fid(rid);
- BOOST_CHECK_EQUAL(ret_fid, exp_fid);
+ u_int16_t exp_pfid = pfid_start + (u_int16_t)((rid - rid_begin)/rid_incr_1);
+ u_int16_t ret_fid = e3.get_remove_pfid(rid);
+ BOOST_CHECK_EQUAL(ret_fid, exp_pfid);
BOOST_CHECK(rid%rid_incr_1 == 0);
}
catch (const jexception& e)
@@ -148,9 +148,9 @@
QPID_AUTO_TEST_CASE(lock)
{
cout << test_filename << ".lock: " << flush;
- u_int16_t fid;
+ u_int16_t pfid;
u_int64_t rid;
- u_int16_t fid_start = 0x4000U;
+ u_int16_t pfid_start = 0x4000U;
u_int64_t rid_begin = 0xdddddddd00000000ULL;
u_int64_t rid_end = 0xdddddddd00000200ULL;
@@ -159,9 +159,9 @@
u_int64_t num_incr_1 = (rid_end - rid_begin)/rid_incr_1;
bool locked = false;
enq_map e4;
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_1, fid++)
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1,
pfid++)
{
- e4.insert_fid(rid, fid, locked);
+ e4.insert_pfid(rid, pfid, locked);
locked = !locked;
}
BOOST_CHECK_EQUAL(e4.size(), num_incr_1);
@@ -183,18 +183,18 @@
// get / unlock
for (u_int64_t rid = rid_begin; rid < rid_end; rid += rid_incr_1)
{
- try { e4.get_fid(rid); }
+ try { e4.get_pfid(rid); }
catch(const jexception& e)
{
BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_LOCKED);
BOOST_CHECK(rid%(2*rid_incr_1));
// unlock, read, then relock
e4.unlock(rid);
- e4.get_fid(rid);
+ e4.get_pfid(rid);
e4.lock(rid);
try
{
- e4.get_fid(rid);
+ e4.get_pfid(rid);
BOOST_ERROR("Failed to throw exception when getting locked
record");
}
catch(const jexception& e)
@@ -206,7 +206,7 @@
// remove all; if locked, use with txn_flag true; should ignore all locked records
for (u_int64_t rid = rid_begin; rid < rid_end; rid += rid_incr_1)
- e4.get_remove_fid(rid, true);
+ e4.get_remove_pfid(rid, true);
BOOST_CHECK(e4.empty());
cout << "ok" << endl;
}
@@ -214,9 +214,9 @@
QPID_AUTO_TEST_CASE(lists)
{
cout << test_filename << ".lists: " << flush;
- u_int16_t fid;
+ u_int16_t pfid;
u_int64_t rid;
- u_int16_t fid_start = 0x5000UL;
+ u_int16_t pfid_start = 0x5000UL;
u_int64_t rid_begin = 0xdddddddd00000000ULL;
u_int64_t rid_end = 0xdddddddd00000200ULL;
@@ -226,11 +226,11 @@
vector<u_int64_t> rid_list;
vector<u_int16_t> fid_list;
enq_map e5;
- for (rid = rid_begin, fid = fid_start; rid < rid_end; rid += rid_incr_1, fid++)
+ for (rid = rid_begin, pfid = pfid_start; rid < rid_end; rid += rid_incr_1,
pfid++)
{
- e5.insert_fid(rid, fid);
+ e5.insert_pfid(rid, pfid);
rid_list.push_back(rid);
- fid_list.push_back(fid);
+ fid_list.push_back(pfid);
}
BOOST_CHECK_EQUAL(e5.size(), num_incr_1);
BOOST_CHECK_EQUAL(rid_list.size(), num_incr_1);
@@ -263,37 +263,37 @@
e6.set_num_jfiles(4);
// Add 100 enqueues to file 1, check that the counts match
- for (u_int16_t fid=0; fid<4; fid++)
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(0));
+ for (u_int16_t pfid=0; pfid<4; pfid++)
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
for (u_int64_t rid=0; rid<100; rid++)
- e6.insert_fid(rid, 1);
- for (u_int16_t fid=0; fid<4; fid++)
+ e6.insert_pfid(rid, 1);
+ for (u_int16_t pfid=0; pfid<4; pfid++)
{
- if (fid == 1)
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(100));
+ if (pfid == 1)
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(100));
else
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(0));
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
}
// Now remove 10 from file 1, check that the counts match
for (u_int64_t rid=0; rid<100; rid+=10)
- e6.get_remove_fid(rid);
- for (u_int16_t fid=0; fid<4; fid++)
+ e6.get_remove_pfid(rid);
+ for (u_int16_t pfid=0; pfid<4; pfid++)
{
- if (fid == 1)
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(90));
+ if (pfid == 1)
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(90));
else
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(0));
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
}
// Now resize the file up and make sure the count in file 1 still exists
e6.set_num_jfiles(8);
- for (u_int16_t fid=0; fid<8; fid++)
+ for (u_int16_t pfid=0; pfid<8; pfid++)
{
- if (fid == 1)
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(90));
+ if (pfid == 1)
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(90));
else
- BOOST_CHECK_EQUAL(e6.get_enq_cnt(fid), u_int32_t(0));
+ BOOST_CHECK_EQUAL(e6.get_enq_cnt(pfid), u_int32_t(0));
}
cout << "ok" << endl;
@@ -311,17 +311,17 @@
// insert even rids with no dups
for (rid = rid_begin, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL,
rid_cnt++)
- e7.insert_fid(rid, u_int16_t(0));
+ e7.insert_pfid(rid, u_int16_t(0));
BOOST_CHECK_EQUAL(e7.size(), num_rid);
// insert odd rids with no dups
for (rid = rid_begin + 1, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL,
rid_cnt++)
- e7.insert_fid(rid, u_int16_t(0));
+ e7.insert_pfid(rid, u_int16_t(0));
BOOST_CHECK_EQUAL(e7.size(), num_rid * 2);
// remove even rids
for (rid = rid_begin, rid_cnt = u_int64_t(0); rid_cnt < num_rid; rid += 2ULL,
rid_cnt++)
- e7.get_remove_fid(rid);
+ e7.get_remove_pfid(rid);
BOOST_CHECK_EQUAL(e7.size(), num_rid);
cout << "ok" << endl;