Author: kpvdr
Date: 2007-10-09 15:53:31 -0400 (Tue, 09 Oct 2007)
New Revision: 989
Added:
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/rtest
Log:
Added transaction map class txn_map; added it to class jcntl.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/Makefile.am 2007-10-09 19:53:31 UTC (rev 989)
@@ -53,6 +53,7 @@
jrnl/pmgr.cpp \
jrnl/rmgr.cpp \
jrnl/rrfc.cpp \
+ jrnl/txn_map.cpp \
jrnl/txn_rec.cpp \
jrnl/wmgr.cpp \
jrnl/wrfc.cpp \
@@ -75,6 +76,7 @@
jrnl/rcvdat.hpp \
jrnl/rmgr.hpp \
jrnl/rrfc.hpp \
+ jrnl/txn_map.hpp \
jrnl/txn_rec.hpp \
jrnl/wmgr.hpp \
jrnl/wrfc.hpp
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -61,17 +61,15 @@
void
enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception)
{
- std::pair<u_int16_t, bool> rec(fid, locked);
+ fid_lock_pair rec(fid, locked);
pthread_mutex_lock(&_mutex);
- std::pair<std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator,
bool> ret =
- _map.insert(std::pair<u_int64_t, std::pair<u_int16_t, bool>
>(rid, rec));
+ std::pair<emap_itr, bool> ret = _map.insert(emap_param(rid, rec));
pthread_mutex_unlock(&_mutex);
if (ret.second == false)
{
std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "rid=0x" << std::setw(16) << rid <<
" fid=0x" << std::setw(4) << fid;
- throw jexception(jerrno::JERR_EMAP_DUPLICATE, ss.str(), "enq_map",
"insert");
+ ss << std::hex << "rid=0x" << rid << "
fid=0x" << fid;
+ throw jexception(jerrno::JERR_MAP_DUPLICATE, ss.str(), "enq_map",
"insert");
}
}
@@ -79,19 +77,19 @@
enq_map::get_fid(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ emap_itr itr = _map.find(rid);
pthread_mutex_unlock(&_mutex);
if (itr == _map.end()) // not found in map
{
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
}
if (itr->second.second) // locked
{
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map",
"get_fid");
+ throw jexception(jerrno::JERR_MAP_LOCKED, ss.str(), "enq_map",
"get_fid");
}
return itr->second.first;
}
@@ -100,20 +98,20 @@
enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
if (itr->second.second) // locked
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_LOCKED, ss.str(), "enq_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_LOCKED, ss.str(), "enq_map",
"get_remove_fid");
}
u_int16_t fid = itr->second.first;
_map.erase(itr);
@@ -125,13 +123,13 @@
enq_map::lock(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
itr->second.second = true;
pthread_mutex_unlock(&_mutex);
@@ -141,13 +139,13 @@
enq_map::unlock(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
itr->second.second = false;
pthread_mutex_unlock(&_mutex);
@@ -157,13 +155,13 @@
enq_map::is_locked(const u_int64_t rid)
{
pthread_mutex_lock(&_mutex);
- std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.find(rid);
+ emap_itr itr = _map.find(rid);
pthread_mutex_unlock(&_mutex);
if (itr == _map.end()) // not found in map
{
std::stringstream ss;
ss << std::hex << "rid=0x" << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
}
return itr->second.second;
}
@@ -173,8 +171,7 @@
{
rv.clear();
pthread_mutex_lock(&_mutex);
- for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.begin();
- itr != _map.end(); itr++)
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
rv.push_back(itr->first);
pthread_mutex_unlock(&_mutex);
}
@@ -184,8 +181,7 @@
{
fv.clear();
pthread_mutex_lock(&_mutex);
- for (std::map<u_int64_t, std::pair<u_int16_t, bool> >::iterator itr =
_map.begin();
- itr != _map.end(); itr++)
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
fv.push_back(itr->second.first);
pthread_mutex_unlock(&_mutex);
}
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -59,7 +59,12 @@
class enq_map
{
private:
- std::map<u_int64_t, std::pair<u_int16_t, bool> > _map;
+ typedef std::pair<u_int16_t, bool> fid_lock_pair;
+ typedef std::pair<u_int64_t, fid_lock_pair> emap_param;
+ typedef std::map<u_int64_t, fid_lock_pair> emap;
+ typedef emap::iterator emap_itr;
+
+ emap _map;
pthread_mutex_t _mutex;
public:
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -58,10 +58,11 @@
_autostop(true),
_datafh(NULL),
_emap(),
+ _tmap(),
_rrfc(),
_wrfc(),
- _rmgr(this, _emap, _rrfc),
- _wmgr(this, _emap, _wrfc)
+ _rmgr(this, _emap, _tmap, _rrfc),
+ _wmgr(this, _emap, _tmap, _wrfc)
{}
jcntl::~jcntl()
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -134,6 +134,7 @@
// Journal control structures
lfh** _datafh; ///< Array of pointers to data file handles
enq_map _emap; ///< Enqueue map for low water mark management
+ txn_map _tmap; ///< Transaction map open transactions
rrfc _rrfc; ///< Read journal rotating file controller
wrfc _wrfc; ///< Write journal rotating file controller
rmgr _rmgr; ///< Read page manager which manages AIO
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -105,10 +105,10 @@
const u_int32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
const u_int32_t jerrno::JERR_DTOK_RIDNOTSET = 0x0a01;
-// class enq_map
-const u_int32_t jerrno::JERR_EMAP_DUPLICATE = 0x0b00;
-const u_int32_t jerrno::JERR_EMAP_NOTFOUND = 0x0b01;
-const u_int32_t jerrno::JERR_EMAP_LOCKED = 0x0b02;
+// class enq_map, txn_map
+const u_int32_t jerrno::JERR_MAP_DUPLICATE = 0x0b00;
+const u_int32_t jerrno::JERR_MAP_NOTFOUND = 0x0b01;
+const u_int32_t jerrno::JERR_MAP_LOCKED = 0x0b02;
// class jinf
const u_int32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00;
@@ -202,12 +202,11 @@
"Attempted to change to illegal state.");
_err_map[JERR_DTOK_RIDNOTSET] = std::string("JERR_DTOK_RIDNOTSET: Record ID not
set.");
- // class enq_map
- _err_map[JERR_EMAP_DUPLICATE] = std::string("JERR_EMAP_DUPLICATE: "
+ // class enq_map, txn_map
+ _err_map[JERR_MAP_DUPLICATE] = std::string("JERR_MAP_DUPLICATE: "
"Attempted to insert enqueue record using duplicate key.");
- _err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: "
- "Key not found in enqueue map.");
- _err_map[JERR_EMAP_LOCKED] = std::string("JERR_EMAP_LOCKED: "
+ _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in
enqueue map.");
+ _err_map[JERR_MAP_LOCKED] = std::string("JERR_MAP_LOCKED: "
"Record ID locked by a pending transaction.");
// class jinf
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -122,10 +122,10 @@
static const u_int32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to
illegal state
static const u_int32_t JERR_DTOK_RIDNOTSET; ///< Record ID not set
- // class enq_map
- static const u_int32_t JERR_EMAP_DUPLICATE; ///< Attempted to insert using
duplicate key
- static const u_int32_t JERR_EMAP_NOTFOUND; ///< Key not found in map
- static const u_int32_t JERR_EMAP_LOCKED; ///< rid locked by pending
txn
+ // class enq_map, txn_map
+ static const u_int32_t JERR_MAP_DUPLICATE; ///< Attempted to insert using
duplicate key
+ static const u_int32_t JERR_MAP_NOTFOUND; ///< Key not found in map
+ static const u_int32_t JERR_MAP_LOCKED; ///< rid locked by pending
txn
// class jinf
static const u_int32_t JERR_JINF_CVALIDFAIL; ///< Compatibility validation
failure
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -73,11 +73,13 @@
const u_int32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
-pmgr::pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages):
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
+ const u_int16_t pages):
_pagesize(pagesize),
_pages(pages),
_jc(jc),
_emap(emap),
+ _tmap(tmap),
_dtokl(NULL),
_page_base_ptr(NULL),
_page_ptr_arr(NULL),
@@ -95,12 +97,13 @@
_cb(NULL)
{}
-pmgr::pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages,
+pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
const u_int16_t pages,
std::deque<data_tok*>* const dtokl) throw (jexception):
_pagesize(pagesize),
_pages(pages),
_jc(jc),
_emap(emap),
+ _tmap(tmap),
_dtokl(dtokl),
_page_base_ptr(NULL),
_page_ptr_arr(NULL),
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -50,6 +50,7 @@
#include <jrnl/enq_map.hpp>
#include <jrnl/enq_rec.hpp>
#include <jrnl/nlfh.hpp>
+#include <jrnl/txn_map.hpp>
#include <jrnl/txn_rec.hpp>
namespace rhm
@@ -116,6 +117,7 @@
const u_int16_t _pages; ///< Number of page cache pages
jcntl* _jc; ///< Pointer to journal controller
enq_map& _emap; ///< Ref to enqueue map
+ txn_map& _tmap; ///< Ref to transaction map
std::deque<data_tok*>* _dtokl; ///< Pointer to external data token
list
void* _page_base_ptr; ///< Base pointer to page memory
void** _page_ptr_arr; ///< Array of pointers to pages in page
memory
@@ -135,9 +137,10 @@
aio_cb _cb; ///< Callback function pointer for AIO events
public:
- pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages);
- pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t
pages,
- std::deque<data_tok*>* const dtokl) throw (jexception);
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
+ const u_int16_t pages);
+ pmgr(jcntl* jc, enq_map& emap, txn_map& tmap, const u_int32_t pagesize,
+ const u_int16_t pages, std::deque<data_tok*>* const dtokl) throw
(jexception);
virtual ~pmgr();
virtual const u_int32_t get_events(page_state state) throw (jexception) = 0;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -42,14 +42,15 @@
namespace journal
{
-rmgr::rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc):
- pmgr(jc, emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
+rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc):
+ pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES),
_rrfc(rrfc),
_hdr()
{}
-rmgr::rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>*
const dtokl) throw (jexception):
- pmgr(jc, emap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES, dtokl),
+rmgr::rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc,
+ std::deque<data_tok*>* const dtokl) throw (jexception):
+ pmgr(jc, emap, tmap, JRNL_RMGR_PAGE_SIZE, JRNL_RMGR_PAGES, dtokl),
_rrfc(rrfc),
_hdr()
{}
@@ -286,7 +287,7 @@
}
catch (jexception& e)
{
- if (e.err_code() != jerrno::JERR_EMAP_NOTFOUND)
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e;
//std::cout << "-nf" << std::flush;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -62,9 +62,9 @@
hdr _hdr; ///< Header used to determind record type
public:
- rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc);
- rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>*
const dtokl)
- throw (jexception);
+ rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc);
+ rmgr(jcntl* jc, enq_map& emap, txn_map& tmap, rrfc& rrfc,
+ std::deque<data_tok*>* const dtokl) throw (jexception);
~rmgr();
void initialize(std::deque<data_tok*>* const dtokl, const aio_cb rd_cb)
throw (jexception);
Added: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -0,0 +1,132 @@
+/**
+* \file txn_map.cpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::txn_map (transaction map). See
+* comments in file txn_map.hpp for details.
+*
+* \author Kim van der Riet
+*
+* Copyright (C) 2007 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#include <jrnl/txn_map.hpp>
+
+#include <iomanip>
+#include <sstream>
+#include <jrnl/jerrno.hpp>
+
+namespace rhm
+{
+namespace journal
+{
+
+txn_map::txn_map():
+ _map()
+{
+ pthread_mutex_init(&_mutex, NULL);
+}
+
+txn_map::~txn_map()
+{
+ pthread_mutex_destroy(&_mutex);
+}
+
+void
+txn_map::insert_rid_fid(const std::string& xid, const u_int64_t rid, const u_int16_t
fid)
+ throw (jexception)
+{
+ rid_fid_pair rec(rid, fid);
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ pthread_mutex_unlock(&_mutex);
+ if (itr == _map.end()) // not found in map
+ {
+ rid_fid_list list;
+ list.push_back(rec);
+ std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+ }
+ else
+ itr->second.push_back(rec);
+}
+
+const txn_map::rid_fid_list
+txn_map::get_rid_fid_list(const std::string& xid) throw (jexception)
+{
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ pthread_mutex_unlock(&_mutex);
+ if (itr == _map.end()) // not found in map
+ {
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid <<
"\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_fid");
+ }
+ return itr->second;
+}
+
+const txn_map::rid_fid_list
+txn_map::get_remove_rid_fid_list(const std::string& xid) throw (jexception)
+{
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid <<
"\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_remove_fid");
+ }
+ rid_fid_list list = itr->second;
+ _map.erase(itr);
+ pthread_mutex_unlock(&_mutex);
+ return list;
+}
+
+const u_int32_t
+txn_map::get_rid_count(const std::string& xid) throw (jexception)
+{
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ pthread_mutex_unlock(&_mutex);
+ if (itr == _map.end()) // not found in map
+ {
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid <<
"\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_fid");
+ }
+ return itr->second.size();
+}
+
+void
+txn_map::xid_list(std::vector<std::string>& xv)
+{
+ xv.clear();
+ pthread_mutex_lock(&_mutex);
+ for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ xv.push_back(itr->first);
+ pthread_mutex_unlock(&_mutex);
+}
+
+} // namespace journal
+} // namespace rhm
Added: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -0,0 +1,87 @@
+/**
+* \file txn_map.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* File containing code for class rhm::journal::txn_map (transaction map).
+* See class documentation for details.
+*
+* \author Kim van der Riet
+*
+* Copyright (C) 2007 Red Hat Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_txn_map_hpp
+#define rhm_journal_txn_map_hpp
+
+namespace rhm
+{
+namespace journal
+{
+ class txn_map;
+}
+}
+
+#include <map>
+#include <list>
+#include <pthread.h>
+#include <vector>
+#include <jrnl/jexception.hpp>
+
+namespace rhm
+{
+namespace journal
+{
+
+ class txn_map
+ {
+ public:
+ typedef std::pair<u_int64_t, u_int16_t> rid_fid_pair;
+ typedef std::list<rid_fid_pair> rid_fid_list;
+
+ private:
+ typedef std::pair<std::string, rid_fid_list> xmap_param;
+ typedef std::map<std::string, rid_fid_list> xmap;
+ typedef xmap::iterator xmap_itr;
+
+ xmap _map;
+ pthread_mutex_t _mutex;
+
+ public:
+ txn_map();
+ ~txn_map();
+
+ void insert_rid_fid(const std::string& xid, const u_int64_t rid, const
u_int16_t fid)
+ throw (jexception);
+ const rid_fid_list get_rid_fid_list(const std::string& xid) throw
(jexception);
+ const rid_fid_list get_remove_rid_fid_list(const std::string& xid) throw
(jexception);
+ const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
+ inline void clear() { _map.clear(); }
+ inline const bool empty() const { return _map.empty(); }
+ inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
+ void xid_list(std::vector<std::string>& xv);
+ };
+
+} // namespace journal
+} // namespace rhm
+
+#endif // ifndef rhm_journal_txn_map_hpp
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -42,8 +42,8 @@
namespace journal
{
-wmgr::wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc):
- pmgr(jc, emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES),
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc):
+ pmgr(jc, emap, tmap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES),
_wrfc(wrfc),
_max_dtokpp(0),
_max_io_wait_us(0),
@@ -57,9 +57,9 @@
_commit_busy(false)
{}
-wmgr::wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc, std::deque<data_tok*>*
const dtokl,
+wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
std::deque<data_tok*>* const dtokl,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception):
- pmgr(jc, emap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, dtokl),
+ pmgr(jc, emap, tmap, JRNL_WMGR_PAGE_SIZE, JRNL_WMGR_PAGES, dtokl),
_wrfc(wrfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-09 19:53:31 UTC (rev 989)
@@ -93,9 +93,10 @@
aio_cb _cb; ///< Callback function pointer for AIO events
public:
- wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc);
- wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc, std::deque<data_tok*>*
const dtokl,
- const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw
(jexception);
+ wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc);
+ wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
+ std::deque<data_tok*>* const dtokl, const u_int32_t max_dtokpp,
+ const u_int32_t max_iowait_us) throw (jexception);
~wmgr();
void initialize(std::deque<data_tok*>* const dtokl, aio_cb wr_cb,
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-10-09 19:53:31 UTC (rev 989)
@@ -52,6 +52,7 @@
jerrno.o \
jinf.o \
enq_map.o \
+ txn_map.o \
jdir.o \
data_tok.o \
file_hdr.o \
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-10-09 19:44:52 UTC (rev 988)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-10-09 19:53:31 UTC (rev 989)
@@ -30,8 +30,8 @@
NUM_JFILES=8
VG_ITERATIONS=1
-#VG_NORM_FILESIZE=11
-VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+VG_NORM_FILESIZE=11
+#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
# Write test
W_DO_TEST=T
@@ -58,8 +58,8 @@
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./janalyze.py"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"