Author: kpvdr
Date: 2007-10-10 10:38:09 -0400 (Wed, 10 Oct 2007)
New Revision: 996
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/rtest
Log:
Added transaction handling. Only recover now remains incomplete to get transactions done.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -33,3 +33,16 @@
JournalImpl::~JournalImpl()
{}
+
+void
+JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
+ std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw
(journal::jexception)
+{
+ // Create list of prepared xids
+ std::vector<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+ i != prep_tx_list.end(); i++)
+ prep_xid_list.push_back(i->xid);
+ journal::jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-10-10 14:38:09 UTC (rev 996)
@@ -25,6 +25,9 @@
#define _JournalImpl_
#include "jrnl/jcntl.hpp"
+#include "jrnl/data_tok.hpp"
+#include "PreparedTransaction.h"
+#include <boost/ptr_container/ptr_list.hpp>
namespace rhm {
namespace bdbstore {
@@ -36,6 +39,17 @@
const std::string& journalDirectory,
const std::string& journalBaseFilename);
~JournalImpl();
+ void recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
+ std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
+ throw (journal::jexception);
+
+ void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
+ throw (journal::jexception)
+ {
+ recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
+ &aio_wr_callback, prep_tx_list);
+ }
};
} // namespace bdbstore
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -108,6 +108,7 @@
size_t _dsize; ///< Data size in bytes
u_int32_t _dblks_written; ///< Data blocks read/written
u_int32_t _dblks_read; ///< Data blocks read/written
+ u_int16_t _fid; ///< FID containing header of enqueue record
u_int64_t _rid; ///< RID of data set by enqueue operation
std::string _xid; ///< XID set by enqueue operation
u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
@@ -146,6 +147,8 @@
inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read;
}
inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }
+ inline const u_int16_t fid() const { return _fid; }
+ inline void set_fid(const u_int16_t fid) { _fid = fid; }
inline const u_int64_t rid() const { return _rid; }
inline void set_rid(const u_int64_t rid) { _rid = rid; }
inline const u_int64_t dequeue_rid() const throw (jexception) {return
_dequeue_rid; }
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -120,7 +120,7 @@
}
void
-enq_map::lock(const u_int64_t rid)
+enq_map::lock(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -136,7 +136,7 @@
}
void
-enq_map::unlock(const u_int64_t rid)
+enq_map::unlock(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -152,7 +152,7 @@
}
const bool
-enq_map::is_locked(const u_int64_t rid)
+enq_map::is_locked(const u_int64_t rid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -75,9 +75,9 @@
void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw
(jexception);
const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
- void lock(const u_int64_t rid);
- void unlock(const u_int64_t rid);
- const bool is_locked(const u_int64_t rid);
+ void lock(const u_int64_t rid) throw (jexception);
+ void unlock(const u_int64_t rid) throw (jexception);
+ const bool is_locked(const u_int64_t rid) throw (jexception);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -123,21 +123,14 @@
void
jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl,
- const aio_cb wr_cb, boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
- throw (jexception)
+ const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list)
throw (jexception)
{
- // Create list of prepared xids
- std::set<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++);
-// TODO!
-// prep_xid_list.insert(i->??);
-
// Verify journal dir and journal files
_jdir.verify_dir();
_rcvdat.reset();
_emap.clear();
- rcvr_janalyze(_rcvdat);
+ _tmap.clear();
+ rcvr_janalyze(_rcvdat, prep_txn_list);
if (_datafh)
{
@@ -354,7 +347,7 @@
}
void
-jcntl::rcvr_janalyze(rcvdat& rd) throw (jexception)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>&
prep_txn_list) throw (jexception)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." +
JRNL_INFO_EXTENSION, true);
try
@@ -367,7 +360,6 @@
if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY)
throw e;
}
-//std::cout << "f" << rd._ffid <<
(rd._empty?"e":"") << " ";
// Restore all read and write pointers
if (!rd._empty)
@@ -376,13 +368,14 @@
for (u_int16_t fnum=0; fnum<JRNL_NUM_FILES && !eoj; fnum++)
{
u_int16_t fid = (fnum + rd._ffid) % JRNL_NUM_FILES;
- eoj = rcvr_fanalyze(fid, rd);
+ eoj = rcvr_fanalyze(fid, rd, prep_txn_list);
}
}
}
const bool
-jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd) throw (jexception)
+jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const
std::vector<std::string>& /*prep_txn_list*/)
+ throw (jexception)
{
bool eoj = false;
std::stringstream ss;
@@ -466,6 +459,12 @@
jifs.seekg(foffs);
}
break;
+ case RHM_JDAT_TXA_MAGIC:
+//std::cout << " a";
+ break;
+ case RHM_JDAT_TXC_MAGIC:
+//std::cout << " c";
+ break;
case RHM_JDAT_EMPTY_MAGIC:
{
//std::cout << " x";
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -49,8 +49,6 @@
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
#include <qpid/broker/PersistableQueue.h>
-#include <PreparedTransaction.h>
-#include <boost/ptr_container/ptr_list.hpp>
namespace rhm
{
@@ -69,7 +67,7 @@
*/
class jcntl : public qpid::broker::ExternalQueueStore
{
- private:
+ protected:
/**
* \brief Journal ID
*
@@ -226,7 +224,7 @@
*/
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw
(jexception);
+ const std::vector<std::string>& prep_txn_list) throw (jexception);
/**
* \brief Recover using internal default callbacks and data_tok lists.
@@ -235,11 +233,10 @@
*
* \exception TODO
*/
- void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
- throw (jexception)
+ void recover(const std::vector<std::string>& prep_txn_list) throw
(jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_tx_list);
+ &aio_wr_callback, prep_txn_list);
}
/**
@@ -590,7 +587,7 @@
- private:
+ protected:
/**
* \brief Check status of journal before allowing write operations.
*/
@@ -614,14 +611,16 @@
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& jrs) throw (jexception);
+ void rcvr_janalyze(rcvdat& jrs, const std::vector<std::string>&
prep_txn_list)
+ throw (jexception);
/**
* \brief Analyze a particular journal file for recovery.
*
* \return <b><i>true</i></b> if end of journal (eoj)
found; <b><i>false</i></b> otherwise.
*/
- const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs) throw (jexception);
+ const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
+ const std::vector<std::string>& prep_txn_list) throw
(jexception);
/**
* Intenal callback write
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -295,6 +295,19 @@
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
//std::cout << "e" << std::flush;
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ // Is this locked by a pending dequeue transaction?
+ try
+ {
+ if (_emap.is_locked(_hdr._rid))
+ return RHM_IORES_TXPENDING;
+ }
+ catch (jexception e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw e;
+ }
+#endif
if (dtokp->rid())
{
if (_hdr._rid != dtokp->rid())
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -104,7 +104,10 @@
inline const int fh() const { return _curr_fh->rd_fh(); }
inline const u_int32_t enqcnt() const { return _curr_fh->enqcnt(); }
inline const u_int32_t incr_enqcnt() { return _curr_fh->incr_enqcnt(); }
+ inline const u_int32_t incr_enqcnt(u_int16_t fid) { return
_fh_arr[fid]->incr_enqcnt(); }
inline const u_int32_t add_enqcnt(u_int32_t a) { return
_curr_fh->add_enqcnt(a); }
+ inline const u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a)
+ { return _fh_arr[fid]->add_enqcnt(a); }
inline const u_int32_t decr_enqcnt(u_int16_t fid) { return
_fh_arr[fid]->decr_enqcnt(); }
inline const u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s)
{ return _fh_arr[fid]->subtr_enqcnt(s); }
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -41,6 +41,12 @@
namespace journal
{
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool
enq_flag):
+ _rid(rid),
+ _fid(fid),
+ _enq_flag(enq_flag)
+{}
+
txn_map::txn_map():
_map()
{
@@ -53,25 +59,24 @@
}
void
-txn_map::insert_rid_fid(const std::string& xid, const u_int64_t rid, const u_int16_t
fid)
- throw (jexception)
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw
(jexception)
{
- rid_fid_pair rec(rid, fid);
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
pthread_mutex_unlock(&_mutex);
if (itr == _map.end()) // not found in map
{
- rid_fid_list list;
- list.push_back(rec);
+ txn_data_list list;
+ list.push_back(td);
std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+ // TODO: check for failure here?
}
else
- itr->second.push_back(rec);
+ itr->second.push_back(td);
}
-const txn_map::rid_fid_list
-txn_map::get_rid_fid_list(const std::string& xid) throw (jexception)
+const txn_data_list
+txn_map::get_tdata_list(const std::string& xid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -85,8 +90,8 @@
return itr->second;
}
-const txn_map::rid_fid_list
-txn_map::get_remove_rid_fid_list(const std::string& xid) throw (jexception)
+const txn_data_list
+txn_map::get_remove_tdata_list(const std::string& xid) throw (jexception)
{
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
@@ -97,7 +102,7 @@
ss << std::hex << "xid=\"" << xid <<
"\"";
throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_remove_fid");
}
- rid_fid_list list = itr->second;
+ txn_data_list list = itr->second;
_map.erase(itr);
pthread_mutex_unlock(&_mutex);
return list;
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -42,7 +42,6 @@
}
#include <map>
-#include <list>
#include <pthread.h>
#include <vector>
#include <jrnl/jexception.hpp>
@@ -52,15 +51,22 @@
namespace journal
{
+ struct txn_data_struct
+ {
+ u_int64_t _rid;
+ u_int16_t _fid;
+ bool _enq_flag;
+ txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
+ };
+ typedef txn_data_struct txn_data;
+ typedef std::vector<txn_data> txn_data_list;
+ typedef txn_data_list::iterator tdl_itr;
+
class txn_map
{
- public:
- typedef std::pair<u_int64_t, u_int16_t> rid_fid_pair;
- typedef std::list<rid_fid_pair> rid_fid_list;
-
private:
- typedef std::pair<std::string, rid_fid_list> xmap_param;
- typedef std::map<std::string, rid_fid_list> xmap;
+ typedef std::pair<std::string, txn_data_list> xmap_param;
+ typedef std::map<std::string, txn_data_list> xmap;
typedef xmap::iterator xmap_itr;
xmap _map;
@@ -70,10 +76,9 @@
txn_map();
~txn_map();
- void insert_rid_fid(const std::string& xid, const u_int64_t rid, const
u_int16_t fid)
- throw (jexception);
- const rid_fid_list get_rid_fid_list(const std::string& xid) throw
(jexception);
- const rid_fid_list get_remove_rid_fid_list(const std::string& xid) throw
(jexception);
+ void insert_txn_data(const std::string& xid, const txn_data& td) throw
(jexception);
+ const txn_data_list get_tdata_list(const std::string& xid) throw
(jexception);
+ const txn_data_list get_remove_tdata_list(const std::string& xid) throw
(jexception);
const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -141,13 +141,10 @@
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+
+ // Remember fid which contains the record header in case record is split over
several files
if (data_offs_dblks == 0)
- {
- _wrfc.incr_enqcnt();
- _emap.insert_fid(rid, _wrfc.index());
- }
-#endif
+ dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -163,6 +160,18 @@
// message. AIO callbacks will then only process this token when entire
message is
// enqueued.
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+ _wrfc.incr_enqcnt(dtokp->fid());
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ if (xid_len) // If part of transaction, add to transaction map
+ {
+ std::string xid((char*)xid_ptr, xid_len);
+ _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), true));
+ }
+ else
+ _emap.insert_fid(rid, dtokp->fid());
+#endif
+
done = true;
}
@@ -263,13 +272,10 @@
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+
+ // Remember fid which contains the record header in case record is split over
several files
if (data_offs_dblks == 0)
- {
- u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
- _wrfc.decr_enqcnt(fid);
- }
-#endif
+ dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -280,6 +286,23 @@
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO
returns.
dtokp->set_wstate(data_tok::DEQ_SUBM);
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ if (xid_len) // If part of transaction, add to transaction map
+ {
+ // If the enqueue is part of a pending txn, it will not yet be in emap
+ try { _emap.lock(rid); }
+ catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
+ std::string xid((char*)xid_ptr, xid_len);
+ _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), false));
+ }
+ else
+ {
+ u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+ _wrfc.decr_enqcnt(fid);
+ }
+#endif
+
done = true;
}
@@ -377,6 +400,10 @@
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over
several files
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -386,6 +413,20 @@
{
dtokp->set_wstate(data_tok::ABORT_SUBM);
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ // Delete this txn from tmap, unlock any locked records in emap
+ std::string xid((char*)xid_ptr, xid_len);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ try { _emap.unlock(itr->_rid); }
+ catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
throw e; }
+ if (itr->_enq_flag)
+ _wrfc.decr_enqcnt(itr->_fid);
+ }
+#endif
+
done = true;
}
@@ -483,6 +524,10 @@
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+
+ // Remember fid which contains the record header in case record is split over
several files
+ if (data_offs_dblks == 0)
+ dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
@@ -492,6 +537,23 @@
{
dtokp->set_wstate(data_tok::COMMIT_SUBM);
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ // Delete this txn from tmap, process records into emap
+ std::string xid((char*)xid_ptr, xid_len);
+ txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+ for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ {
+ if (itr->_enq_flag) // txn enqueue
+ _emap.insert_fid(itr->_rid, itr->_fid);
+ else // txn dequeue
+ {
+ u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+ _wrfc.decr_enqcnt(fid);
+ }
+ }
+#endif
+
done = true;
}
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-10 14:38:09 UTC (rev 996)
@@ -33,10 +33,8 @@
#include "msg_producer.hpp"
#include "msg_consumer.hpp"
#include "jtest.hpp"
+#include <vector>
-#include <PreparedTransaction.h>
-#include <boost/ptr_container/ptr_list.hpp>
-
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000
@@ -161,7 +159,7 @@
void EmptyRecoverTest()
{
- boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+ std::vector<std::string> txn_list;
//Stack
char* test_name = "EmptyRecoverTest_Stack";
try
@@ -264,7 +262,7 @@
void RecoverReadTest()
{
- boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+ std::vector<std::string> txn_list;
//Stack
char* test_name = "RecoverReadTest_Stack";
try
@@ -334,7 +332,7 @@
void RecoveredReadTest()
{
- boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+ std::vector<std::string> txn_list;
//Stack
char* test_name = "RecoveredReadTest_Stack";
try
@@ -419,7 +417,7 @@
void RecoveredDequeueTest()
{
- boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+ std::vector<std::string> txn_list;
//Stack
char* test_name = "RecoveredDequeueTest_Stack";
try
@@ -508,7 +506,7 @@
void ComplexRecoveryTest1()
{
- boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+ std::vector<std::string> txn_list;
//Stack
char* test_name = "ComplexRecoveryTest1_Stack";
try
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-10-10 14:38:09 UTC (rev 996)
@@ -30,8 +30,8 @@
NUM_JFILES=8
VG_ITERATIONS=1
-VG_NORM_FILESIZE=11
-#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+#VG_NORM_FILESIZE=11
+VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
# Write test
W_DO_TEST=T
@@ -58,8 +58,8 @@
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./janalyze.py"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"