Author: kpvdr
Date: 2007-10-11 17:46:52 -0400 (Thu, 11 Oct 2007)
New Revision: 1012
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Added is_synched() calls, other tidy-ups and bugfixes
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -304,7 +304,7 @@
try
{
- jQueue->recover(prepared); // start recovery
+ jQueue->recover(prepared, key.id); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
@@ -540,11 +540,11 @@
std::set<string> prepared;
collectPreparedXids(prepared);
- txn_lock_map enqueues;
- txn_lock_map dequeues;
//when using the async journal, it will abort unprepaired xids and populate the locked
maps
if (!usingJrnl()){
- std::set<string> known;
+ txn_lock_map enqueues;
+ txn_lock_map dequeues;
+ std::set<string> known;
readXids(enqueueXidDb, known);
readXids(dequeueXidDb, known);
@@ -557,12 +557,19 @@
}
readLockedMappings(enqueueXidDb, enqueues);
readLockedMappings(dequeueXidDb, dequeues);
- }
-
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++)
{
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
-
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end();
i++) {
+ txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+ }
+ } else {
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end();
i++) {
+ LockedMappings::shared_ptr enq_ptr;
+ enq_ptr.reset(new LockedMappings);
+ LockedMappings::shared_ptr deq_ptr;
+ deq_ptr.reset(new LockedMappings);
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ }
+
+ }
}
void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:46:52 UTC (rev 1012)
@@ -113,7 +113,7 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return false;} // make configurable
+ static inline bool usingJrnl() {return true;} // make configurable
string getJrnlBaseDir();
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -37,12 +37,29 @@
void
JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw
(journal::jexception)
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t
queue_id)
+ throw (journal::jexception)
{
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++)
+ i != prep_tx_list.end(); i++) {
prep_xid_list.push_back(i->xid);
+ }
+
journal::jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+
+ // Populate PreparedTransaction lists from _tmap
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+ i != prep_tx_list.end(); i++) {
+ journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++)
{
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_rid);
+ }
+ }
+ }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-10-11 21:46:52 UTC (rev 1012)
@@ -41,14 +41,14 @@
~JournalImpl();
void recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
- throw (journal::jexception);
+ boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
+ u_int64_t queue_id) throw (journal::jexception);
- void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list)
- throw (journal::jexception)
+ void recover(boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
+ u_int64_t queue_id) throw (journal::jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_tx_list);
+ &aio_wr_callback, prep_tx_list, queue_id);
}
};
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -319,11 +319,18 @@
}
const bool
-deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw
(jexception)
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _deq_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -347,24 +354,17 @@
}
// Decode xid
ifsp->read((char*)_buff, _deq_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _deq_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _deq_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
- _deq_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- else
- {
- // Igore rest of record
- rec_offs_dblks = rec_size_dblks();
- ifsp->ignore(rec_offs_dblks * JRNL_DBLK_SIZE - sizeof(_deq_hdr));
- return true;
- }
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
_deq_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw
(jexception);
+
inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -428,11 +428,18 @@
}
const bool
-enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw
(jexception)
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _enq_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -462,23 +469,17 @@
}
// Decode xid
ifsp->read((char*)_buff, _enq_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _enq_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _enq_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
- _enq_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- else
- {
- // Igore rest of record
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr));
- return true;
- }
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
_enq_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -89,8 +89,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw
(jexception);
+
const size_t get_xid(void** const xidpp);
const size_t get_data(void** const datapp);
inline const bool is_transient() const { return _enq_hdr.is_transient(); }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -255,9 +255,9 @@
}
const bool
-jcntl::is_txn_synced(const std::string& /*xid*/) throw (jexception)
+jcntl::is_txn_synced(const std::string& xid) throw (jexception)
{
- return RHM_IORES_NOTIMPL;
+ return _tmap.is_txn_synced(xid);
}
const u_int32_t
@@ -375,7 +375,7 @@
jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const std::vector<std::string>& prep_txn_list) throw (jexception)
{
- u_int32_t dblks_read = 0;
+ size_t cum_size_read = 0;
bool done = false;
void* xidp = NULL;
hdr h;
@@ -389,7 +389,7 @@
enq_rec er;
while (!done)
{
- done = er.rcv_decode(h, ifsp, dblks_read);
+ done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
rd._enq_cnt_list[fid]++;
@@ -417,7 +417,7 @@
deq_rec dr;
while (!done)
{
- done = dr.rcv_decode(h, ifsp, dblks_read);
+ done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
if (dr.xid_size())
@@ -458,7 +458,7 @@
txn_rec ar;
while (!done)
{
- done = ar.rcv_decode(h, ifsp, dblks_read);
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -493,7 +493,7 @@
txn_rec cr;
while (!done)
{
- done = cr.rcv_decode(h, ifsp, dblks_read);
+ done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, process records into emap
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -204,8 +204,8 @@
// class enq_map, txn_map
_err_map[JERR_MAP_DUPLICATE] = std::string("JERR_MAP_DUPLICATE: "
- "Attempted to insert enqueue record using duplicate key.");
- _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in
enqueue map.");
+ "Attempted to insert record into map using duplicate key.");
+ _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in
map.");
_err_map[JERR_MAP_LOCKED] = std::string("JERR_MAP_LOCKED: "
"Record ID locked by a pending transaction.");
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -150,7 +150,7 @@
virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception) = 0;
- virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t&
rec_offs_dblks)
+ virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
throw (jexception) = 0;
virtual std::string& str(std::string& str) const = 0;
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -44,7 +44,8 @@
txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool
enq_flag):
_rid(rid),
_fid(fid),
- _enq_flag(enq_flag)
+ _enq_flag(enq_flag),
+ _aio_compl(false)
{}
txn_map::txn_map():
@@ -58,21 +59,24 @@
pthread_mutex_destroy(&_mutex);
}
-void
+const bool
txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw
(jexception)
{
+ bool ok = true;
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
- pthread_mutex_unlock(&_mutex);
if (itr == _map.end()) // not found in map
{
txn_data_list list;
list.push_back(td);
std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
- // TODO: check for failure here?
+ if (!ret.second) // duplicate
+ ok = false;
}
else
itr->second.push_back(td);
+ pthread_mutex_unlock(&_mutex);
+ return ok;
}
const txn_data_list
@@ -85,7 +89,7 @@
{
std::stringstream ss;
ss << std::hex << "xid=\"" << xid <<
"\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"txn_data_list");
}
return itr->second;
}
@@ -100,7 +104,7 @@
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "xid=\"" << xid <<
"\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_remove_tdata_list");
}
txn_data_list list = itr->second;
_map.erase(itr);
@@ -118,11 +122,69 @@
{
std::stringstream ss;
ss << std::hex << "xid=\"" << xid <<
"\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"get_rid_count");
}
return itr->second.size();
}
+const bool
+txn_map::is_txn_synced(const std::string& xid) throw (jexception)
+{
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid <<
"\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"is_txn_synced");
+ }
+ txn_data_list list = itr->second;
+ bool is_synced = true;
+ for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ {
+ if (!litr->_aio_compl)
+ {
+ is_synced = false;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&_mutex);
+ return is_synced;
+}
+
+const bool
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+{
+ bool ok = true;
+ bool found = false;
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ ok = false;
+ else
+ {
+ txn_data_list list = itr->second;
+ for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ {
+ if (litr->_rid == rid)
+ {
+ found = true;
+ litr->_aio_compl = true;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock(&_mutex);
+ if (ok && !found)
+ {
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid <<
"\" rid=" << rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map",
"set_aio_compl");
+ }
+ return ok;
+}
+
void
txn_map::xid_list(std::vector<std::string>& xv)
{
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -53,9 +53,10 @@
struct txn_data_struct
{
- u_int64_t _rid;
- u_int16_t _fid;
- bool _enq_flag;
+ u_int64_t _rid; ///< Record id for this operation
+ u_int16_t _fid; ///< File id, to be used when transferring to emap on
commit
+ bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _aio_compl; ///< Initially false, set to true when AIO returns
txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
};
typedef txn_data_struct txn_data;
@@ -76,10 +77,12 @@
txn_map();
~txn_map();
- void insert_txn_data(const std::string& xid, const txn_data& td) throw
(jexception);
+ const bool insert_txn_data(const std::string& xid, const txn_data& td)
throw (jexception);
const txn_data_list get_tdata_list(const std::string& xid) throw
(jexception);
const txn_data_list get_remove_tdata_list(const std::string& xid) throw
(jexception);
const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
+ const bool is_txn_synced(const std::string& xid) throw (jexception);
+ const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -314,11 +314,18 @@
}
const bool
-txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw
(jexception)
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _txn_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -339,15 +346,16 @@
}
// Decode xid
ifsp->read((char*)_buff, _txn_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _txn_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _txn_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) -
_txn_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) -
_txn_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw
(jexception);
+
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
inline const size_t data_size() const { return 0; } // This record never carries
data
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -123,7 +123,6 @@
_enq_busy = true;
u_int64_t rid = initialize_rid(cont, dtokp);
-
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient);
if (!cont)
{
@@ -262,8 +261,14 @@
}
u_int64_t rid = initialize_rid(cont, dtokp);
-
_deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
+ if (!cont)
+ {
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -392,6 +397,14 @@
u_int64_t rid = initialize_rid(cont, dtokp);
_txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -516,6 +529,14 @@
u_int64_t rid = initialize_rid(cont, dtokp);
_txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -750,6 +771,8 @@
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(),
"wmgr",
"get_events");
}
+ if (dtp->has_xid())
+ _tmap.set_aio_compl(dtp->xid(), dtp->rid());
_dtokl->push_back(dtp);
}
tot_data_toks += s;
@@ -766,6 +789,7 @@
}
else // File header writes have no pcb
{
+ // get fid from original file header record, update pointers for that fid
file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
u_int32_t fid = fhp->_fid;
nlfh* nlfhp = _wrfc.file_handle(fid);