Author: kpvdr
Date: 2007-09-27 11:18:31 -0400 (Thu, 27 Sep 2007)
New Revision: 948
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/dtx_rec.cpp
store/trunk/cpp/lib/jrnl/dtx_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jrec.cpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
Log:
Encode and decode paths for enq_rec, dec_rec, dtx_rec completed. The encoding of xids not
yet tested, however, but all tests which do not have xid still ok.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -415,7 +415,7 @@
std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
- rhm::journal::iores res = jc->read_next_data_record(&buff, buffSize,
&dtokp);
+ rhm::journal::iores res = jc->read_data_record(&buff, buffSize,
&dtokp);
readSize = dtokp.dsize();
assert(readSize < buffSize); /// fail safe for hack...
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -32,11 +32,310 @@
#include <jrnl/deq_rec.hpp>
+#include <assert.h>
+#include <iomanip>
+#include <sstream>
+#include <jrnl/jerrno.hpp>
+
namespace rhm
{
namespace journal
{
+deq_rec::deq_rec():
+ _deq_hdr(),
+ _xid(),
+ _deq_tail()
+{}
+deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string xid):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, rid, drid, xid.size(), RHM_JDAT_VERSION),
+ _xid(xid),
+ _deq_tail(_deq_hdr._hdr)
+{}
+
+deq_rec::~deq_rec()
+{}
+
+u_int32_t
+deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw
(jexception)
+{
+ assert(wptr != NULL);
+ assert(max_size_dblks > 0);
+ assert(_xid.size() == _deq_hdr._xidsize);
+
+ size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+ size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+ size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split
required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs
: 0;
+ size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ ::memcpy(wptr, (const char*)_xid.c_str() + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize2;
+ if (rem)
+ {
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs :
0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ ::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs,
wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_deq_hdr);
+ size_t wsize = _deq_hdr._xidsize > rec_offs ? _deq_hdr._xidsize - rec_offs
: 0;
+ if (wsize)
+ {
+ ::memcpy(wptr, _xid.c_str() + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _deq_hdr._xidsize - wsize;
+ wsize = sizeof(_deq_tail) > rec_offs ? sizeof(_deq_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ ::memcpy((char*)wptr + wr_cnt, (char*)&_deq_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef RHM_CLEAN
+ ::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR,
+ (size_dblks(rec_size()) * JRNL_DBLK_SIZE) -
+ (rec_offs_dblks * JRNL_DBLK_SIZE) - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_deq_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ ::memcpy(wptr, (void*)&_deq_hdr, sizeof(_deq_hdr));
+ wr_cnt = sizeof(_deq_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required - can only
occur with xid
+ {
+ size_t wsize;
+ rem -= sizeof(_deq_hdr);
+ if (rem)
+ {
+ wsize = rem >= _deq_hdr._xidsize ? _deq_hdr._xidsize : rem;
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_deq_tail) ? sizeof(_deq_tail) : rem;
+ ::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_deq_hdr._xidsize)
+ {
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), _deq_hdr._xidsize);
+ wr_cnt += _deq_hdr._xidsize;
+ ::memcpy((char*)wptr + wr_cnt, (void*)&_deq_tail,
sizeof(_deq_tail));
+ wr_cnt += sizeof(_deq_tail);
+ }
+#ifdef RHM_CLEAN
+ ::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR,
+ (size_dblks(rec_size()) * JRNL_DBLK_SIZE) - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+u_int32_t
+deq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
+ throw (jexception)
+{
+ assert(rptr != NULL);
+ assert(max_size_dblks > 0);
+
+ size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(enq_hdr::size() +
_deq_hdr._xidsize +
+ rec_tail::size());
+ const size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+
+ if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page
+ if (rec_offs - deq_hdr::size() < _deq_hdr._xidsize)
+ {
+ // Part of xid still outstanding, copy remainder of xid and tail
+ const size_t xid_offs = rec_offs - deq_hdr::size();
+ const size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ _xid.append((char*)rptr, xid_rem);
+ rd_cnt = xid_rem;
+ ::memcpy((void*)&_deq_tail, ((char*)rptr + rd_cnt),
sizeof(_deq_tail));
+ chk_tail();
+ rd_cnt += sizeof(_deq_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const size_t tail_offs = rec_offs - deq_hdr::size() - _deq_hdr._xidsize;
+ const size_t tail_rem = rec_tail::size() - tail_offs;
+ ::memcpy((char*)&_deq_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page, tail split
+ const size_t xid_offs = rec_offs - deq_hdr::size();
+ const size_t xid_rem = _deq_hdr._xidsize - xid_offs;
+ _xid.append((char*)rptr, xid_rem);
+ rd_cnt += xid_rem;
+ const size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ ::memcpy((void*)&_deq_tail, ((char*)rptr + xid_rem), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Remainder of xid split
+ const size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+ _xid.append((char*)rptr, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ _deq_hdr._hdr.copy(h);
+ rd_cnt = sizeof(hdr);
+ _deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(u_int64_t);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ rd_cnt += sizeof(u_int32_t); // Filler 0
+#endif
+ _deq_hdr._xidsize = *(size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = _deq_hdr.size();
+ chk_hdr();
+ if (_deq_hdr._xidsize)
+ {
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() +
_deq_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(enq_hdr::size() +
_deq_hdr._xidsize +
+ rec_tail::size());
+
+ // Check if record (header + xid + tail) fits within this page, we can check
the
+ // tail before the expense of copying data to memory
+ if (hdr_xid_tail_dblks <= max_size_dblks)
+ {
+ // Entire header, xid and tail fits within this page
+ ::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt +
_deq_hdr._xidsize,
+ sizeof(_deq_tail));
+ chk_tail();
+ _xid.assign((char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize + sizeof(_deq_tail);
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Entire header and xid fit within this page, tail split
+ _xid.assign((char*)rptr + rd_cnt, _deq_hdr._xidsize);
+ rd_cnt += _deq_hdr._xidsize;
+ const size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ ::memcpy((void*)&_deq_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split
+ const size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ _xid.assign((char*)rptr + rd_cnt, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+std::string&
+deq_rec::str(std::string& str) const
+{
+ std::stringstream ss;
+ ss << "deq_rec: m=" << _deq_hdr._hdr._magic;
+ ss << " v=" << (int)_deq_hdr._hdr._version;
+ ss << " rid=" << _deq_hdr._hdr._rid;
+ ss << " dren=" << _deq_hdr._deq_rid;
+ if (_deq_hdr._xidsize)
+ ss << " xid=\"" << _xid << "\"";
+ str.append(ss.str());
+ return str;
+}
+
+const size_t
+deq_rec::xid_size() const
+{
+ return _deq_hdr._xidsize;
+}
+
+const size_t
+deq_rec::rec_size() const
+{
+ return deq_hdr::size() + (_deq_hdr._xidsize ? _deq_hdr._xidsize + rec_tail::size() :
0);
+}
+
+void
+deq_rec::chk_hdr() const throw (jexception)
+{
+ jrec::chk_hdr(_deq_hdr._hdr);
+ if (_deq_hdr._hdr._magic != RHM_JDAT_DEQ_MAGIC)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "deq magic: rid=0x" << std::setw(16) <<
_deq_hdr._hdr._rid;
+ ss << ": expected=0x" << std::setw(8) <<
RHM_JDAT_DEQ_MAGIC;
+ ss << " read=0x" << std::setw(2) <<
(int)_deq_hdr._hdr._magic;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "deq_rec",
"chk_hdr");
+ }
+}
+
+void
+deq_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+{
+ chk_hdr();
+ jrec::chk_rid(_deq_hdr._hdr, rid);
+}
+
+void
+deq_rec::chk_tail() const throw (jexception)
+{
+ jrec::chk_tail(_deq_tail, _deq_hdr._hdr);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -54,6 +54,29 @@
*/
class deq_rec : public jrec
{
+ private:
+ deq_hdr _deq_hdr; ///< Dequeue header
+ std::string _xid; ///< XID - empty string means no XID is set
+ rec_tail _deq_tail; ///< Record tail, only encoded if XID is present
+
+ public:
+ deq_rec();
+ deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string xid);
+ ~deq_rec();
+
+ u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ throw (jexception);
+ u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
+ throw (jexception);
+ std::string& str(std::string& str) const;
+ inline const size_t data_size() const { return 0; } // This record never carries
data
+ const size_t xid_size() const;
+ const size_t rec_size() const;
+
+ private:
+ virtual void chk_hdr() const throw (jexception) = 0;
+ virtual void chk_hdr(u_int64_t rid) const throw (jexception) = 0;
+ virtual void chk_tail() const throw (jexception) = 0;
}; // class deq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/dtx_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/dtx_rec.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/dtx_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -32,11 +32,308 @@
#include <jrnl/dtx_rec.hpp>
+#include <assert.h>
+#include <iomanip>
+#include <sstream>
+#include <jrnl/jerrno.hpp>
+
namespace rhm
{
namespace journal
{
+dtx_rec::dtx_rec():
+ _dtx_hdr(),
+ _xid(),
+ _dtx_tail()
+{}
+dtx_rec::dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string xid):
+ _dtx_hdr(magic, rid, xid.size(), RHM_JDAT_VERSION),
+ _xid(xid),
+ _dtx_tail(_dtx_hdr._hdr)
+{}
+
+dtx_rec::~dtx_rec()
+{}
+
+u_int32_t
+dtx_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw
(jexception)
+{
+ assert(wptr != NULL);
+ assert(max_size_dblks > 0);
+ assert(_xid.size() == _dtx_hdr._xidsize);
+ assert(_dtx_hdr._xidsize > 0);
+
+ size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+ size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
+ size_t wr_cnt = 0;
+ if (rec_offs_dblks) // Continuation of split dequeue record (over 2 or more pages)
+ {
+ if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split
required
+ {
+ rec_offs -= sizeof(_dtx_hdr);
+ size_t wsize = _dtx_hdr._xidsize > rec_offs ? _dtx_hdr._xidsize - rec_offs
: 0;
+ size_t wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ ::memcpy(wptr, (const char*)_xid.c_str() + rec_offs, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= _dtx_hdr._xidsize - wsize2;
+ if (rem)
+ {
+ wsize = sizeof(_dtx_tail) > rec_offs ? sizeof(_dtx_tail) - rec_offs :
0;
+ wsize2 = wsize;
+ if (wsize)
+ {
+ if (wsize > rem)
+ wsize = rem;
+ ::memcpy((char*)wptr + wr_cnt, (char*)&_dtx_tail + rec_offs,
wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ rec_offs -= sizeof(_dtx_tail) - wsize2;
+ }
+ assert(rem == 0);
+ assert(rec_offs == 0);
+ }
+ else // No further split required
+ {
+ rec_offs -= sizeof(_dtx_hdr);
+ size_t wsize = _dtx_hdr._xidsize > rec_offs ? _dtx_hdr._xidsize - rec_offs
: 0;
+ if (wsize)
+ {
+ ::memcpy(wptr, _xid.c_str() + rec_offs, wsize);
+ wr_cnt += wsize;
+ }
+ rec_offs -= _dtx_hdr._xidsize - wsize;
+ wsize = sizeof(_dtx_tail) > rec_offs ? sizeof(_dtx_tail) - rec_offs : 0;
+ if (wsize)
+ {
+ ::memcpy((char*)wptr + wr_cnt, (char*)&_dtx_tail + rec_offs, wsize);
+ wr_cnt += wsize;
+#ifdef RHM_CLEAN
+ ::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR,
+ (size_dblks(rec_size()) * JRNL_DBLK_SIZE) -
+ (rec_offs_dblks * JRNL_DBLK_SIZE) - wr_cnt);
+#endif
+ }
+ rec_offs -= sizeof(_dtx_tail) - wsize;
+ assert(rec_offs == 0);
+ }
+ }
+ else // Start at beginning of data record
+ {
+ // Assumption: the header will always fit into the first dblk
+ ::memcpy(wptr, (void*)&_dtx_hdr, sizeof(_dtx_hdr));
+ wr_cnt = sizeof(_dtx_hdr);
+ if (size_dblks(rec_size()) > max_size_dblks) // Split required
+ {
+ size_t wsize;
+ rem -= sizeof(_dtx_hdr);
+ if (rem)
+ {
+ wsize = rem >= _dtx_hdr._xidsize ? _dtx_hdr._xidsize : rem;
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
+ wsize = rem >= sizeof(_dtx_tail) ? sizeof(_dtx_tail) : rem;
+ ::memcpy((char*)wptr + wr_cnt, (void*)&_dtx_tail, wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ assert(rem == 0);
+ }
+ else // No split required
+ {
+ if (_dtx_hdr._xidsize)
+ {
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), _dtx_hdr._xidsize);
+ wr_cnt += _dtx_hdr._xidsize;
+ ::memcpy((char*)wptr + wr_cnt, (void*)&_dtx_tail,
sizeof(_dtx_tail));
+ wr_cnt += sizeof(_dtx_tail);
+ }
+#ifdef RHM_CLEAN
+ ::memset((char*)wptr + wr_cnt, RHM_CLEAN_CHAR,
+ (size_dblks(rec_size()) * JRNL_DBLK_SIZE) - wr_cnt);
+#endif
+ }
+ }
+ return size_dblks(wr_cnt);
+}
+
+u_int32_t
+dtx_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
+ throw (jexception)
+{
+ assert(rptr != NULL);
+ assert(max_size_dblks > 0);
+
+ size_t rd_cnt = 0;
+ if (rec_offs_dblks) // Continuation of record on new page
+ {
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _dtx_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(enq_hdr::size() +
_dtx_hdr._xidsize +
+ rec_tail::size());
+ const size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
+
+ if (hdr_xid_tail_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page
+ if (rec_offs - deq_hdr::size() < _dtx_hdr._xidsize)
+ {
+ // Part of xid still outstanding, copy remainder of xid and tail
+ const size_t xid_offs = rec_offs - deq_hdr::size();
+ const size_t xid_rem = _dtx_hdr._xidsize - xid_offs;
+ _xid.append((char*)rptr, xid_rem);
+ rd_cnt = xid_rem;
+ ::memcpy((void*)&_dtx_tail, ((char*)rptr + rd_cnt),
sizeof(_dtx_tail));
+ chk_tail();
+ rd_cnt += sizeof(_dtx_tail);
+ }
+ else
+ {
+ // Tail or part of tail only outstanding, complete tail
+ const size_t tail_offs = rec_offs - deq_hdr::size() - _dtx_hdr._xidsize;
+ const size_t tail_rem = rec_tail::size() - tail_offs;
+ ::memcpy((char*)&_dtx_tail + tail_offs, rptr, tail_rem);
+ chk_tail();
+ rd_cnt = tail_rem;
+ }
+ }
+ else if (hdr_xid_dblks - rec_offs_dblks <= max_size_dblks)
+ {
+ // Remainder of xid fits within this page, tail split
+ const size_t xid_offs = rec_offs - deq_hdr::size();
+ const size_t xid_rem = _dtx_hdr._xidsize - xid_offs;
+ _xid.append((char*)rptr, xid_rem);
+ rd_cnt += xid_rem;
+ const size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ ::memcpy((void*)&_dtx_tail, ((char*)rptr + xid_rem), tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Remainder of xid split
+ const size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE);
+ _xid.append((char*)rptr, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ else // Start of record
+ {
+ // Get and check header
+ _dtx_hdr._hdr.copy(h);
+ rd_cnt = sizeof(hdr);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ rd_cnt += sizeof(u_int32_t); // Filler 0
+#endif
+ _dtx_hdr._xidsize = *(size_t*)((char*)rptr + rd_cnt);
+ rd_cnt = _dtx_hdr.size();
+ chk_hdr();
+ const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _dtx_hdr._xidsize);
+ const u_int32_t hdr_xid_tail_dblks = size_dblks(enq_hdr::size() +
_dtx_hdr._xidsize +
+ rec_tail::size());
+
+ // Check if record (header + xid + tail) fits within this page, we can check the
+ // tail before the expense of copying data to memory
+ if (hdr_xid_tail_dblks <= max_size_dblks)
+ {
+ // Entire header, xid and tail fits within this page
+ ::memcpy((void*)&_dtx_tail, (char*)rptr + rd_cnt + _dtx_hdr._xidsize,
+ sizeof(_dtx_tail));
+ chk_tail();
+ _xid.assign((char*)rptr + rd_cnt, _dtx_hdr._xidsize);
+ rd_cnt += _dtx_hdr._xidsize + sizeof(_dtx_tail);
+ }
+ else if (hdr_xid_dblks <= max_size_dblks)
+ {
+ // Entire header and xid fit within this page, tail split
+ _xid.assign((char*)rptr + rd_cnt, _dtx_hdr._xidsize);
+ rd_cnt += _dtx_hdr._xidsize;
+ const size_t tail_rem = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ if (tail_rem)
+ {
+ ::memcpy((void*)&_dtx_tail, (char*)rptr + rd_cnt, tail_rem);
+ rd_cnt += tail_rem;
+ }
+ }
+ else
+ {
+ // Header fits within this page, xid split
+ const size_t xid_cp_size = (max_size_dblks * JRNL_DBLK_SIZE) - rd_cnt;
+ _xid.assign((char*)rptr + rd_cnt, xid_cp_size);
+ rd_cnt += xid_cp_size;
+ }
+ }
+ return size_dblks(rd_cnt);
+}
+
+std::string&
+dtx_rec::str(std::string& str) const
+{
+ std::stringstream ss;
+ if (_dtx_hdr._hdr._magic == RHM_JDAT_DTXA_MAGIC)
+ ss << "dtxa_rec: m=" << _dtx_hdr._hdr._magic;
+ else
+ ss << "dtxc_rec: m=" << _dtx_hdr._hdr._magic;
+ ss << " v=" << (int)_dtx_hdr._hdr._version;
+ ss << " rid=" << _dtx_hdr._hdr._rid;
+ ss << " xid=\"" << _xid << "\"";
+ str.append(ss.str());
+ return str;
+}
+
+const size_t
+dtx_rec::xid_size() const
+{
+ return _dtx_hdr._xidsize;
+}
+
+const size_t
+dtx_rec::rec_size() const
+{
+ return deq_hdr::size() + _dtx_hdr._xidsize + rec_tail::size();
+}
+
+void
+dtx_rec::chk_hdr() const throw (jexception)
+{
+ jrec::chk_hdr(_dtx_hdr._hdr);
+ if (_dtx_hdr._hdr._magic != RHM_JDAT_DTXA_MAGIC && _dtx_hdr._hdr._magic !=
RHM_JDAT_DTXC_MAGIC)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "dtx magic: rid=0x" << std::setw(16) <<
_dtx_hdr._hdr._rid;
+ ss << ": expected=(0x" << std::setw(8) <<
RHM_JDAT_DTXA_MAGIC;
+ ss << " or 0x" << RHM_JDAT_DTXC_MAGIC;
+ ss << ") read=0x" << std::setw(2) <<
(int)_dtx_hdr._hdr._magic;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "dtx_rec",
"chk_hdr");
+ }
+}
+
+void
+dtx_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+{
+ chk_hdr();
+ jrec::chk_rid(_dtx_hdr._hdr, rid);
+}
+
+void
+dtx_rec::chk_tail() const throw (jexception)
+{
+ jrec::chk_tail(_dtx_tail, _dtx_hdr._hdr);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/dtx_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/dtx_rec.hpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/dtx_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -54,6 +54,29 @@
*/
class dtx_rec : public jrec
{
+ private:
+ dtx_hdr _dtx_hdr; ///< DTX header
+ std::string _xid; ///< XID
+ rec_tail _dtx_tail; ///< Record tail
+
+ public:
+ dtx_rec();
+ dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string xid);
+ ~dtx_rec();
+
+ u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+ throw (jexception);
+ u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
+ throw (jexception);
+ std::string& str(std::string& str) const;
+ inline const size_t data_size() const { return 0; } // This record never carries
data
+ const size_t xid_size() const;
+ const size_t rec_size() const;
+
+ private:
+ void chk_hdr() const throw (jexception);
+ void chk_hdr(u_int64_t rid) const throw (jexception);
+ void chk_tail() const throw (jexception);
}; // class dtx_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -47,6 +47,7 @@
enq_rec::enq_rec():
jrec(), // superclass
_enq_hdr(RHM_JDAT_ENQ_MAGIC, 0, 0, 0, RHM_JDAT_VERSION),
+ _xid(),
_data(NULL),
_buff(NULL),
_enq_tail(_enq_hdr._hdr),
@@ -59,12 +60,14 @@
enq_rec::enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen):
jrec(), // superclass
_enq_hdr(RHM_JDAT_ENQ_MAGIC, rid, 0, dlen, RHM_JDAT_VERSION),
+ _xid(),
_data(dbuf),
_buff(NULL),
_enq_tail(_enq_hdr._hdr),
_max_data_size(0),
_data_size(dlen),
- _rec_size(size_dblks(dlen + enq_hdr::size() + rec_tail::size()) *
JRNL_DBLK_SIZE)
+ _rec_size(size_dblks(enq_hdr::size() + _enq_hdr._xidsize + dlen +
rec_tail::size()) *
+ JRNL_DBLK_SIZE)
{}
// Constructor used for read operations, where buf contains preallocated space
@@ -72,6 +75,7 @@
enq_rec::enq_rec(void* const buf, const size_t bufsize):
jrec(), // superclass
_enq_hdr(RHM_JDAT_ENQ_MAGIC, 0, 0, bufsize, RHM_JDAT_VERSION),
+ _xid(),
_data(NULL),
_buff(buf),
_enq_tail(_enq_hdr._hdr),
@@ -127,6 +131,7 @@
{
assert(wptr != NULL);
assert(max_size_dblks > 0);
+ assert(_xid.size() == _enq_hdr._xidsize);
size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
size_t rem = max_size_dblks * JRNL_DBLK_SIZE;
@@ -135,19 +140,18 @@
{
if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split
required
{
- // TODO: The header will never be split as it will always be less than one
dblk.
- // Remove this check...
- size_t wsize = sizeof(_enq_hdr) > rec_offs ? sizeof(_enq_hdr) - rec_offs :
0;
+ rec_offs -= sizeof(_enq_hdr);
+ size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs
: 0;
size_t wsize2 = wsize;
if (wsize)
{
if (wsize > rem)
wsize = rem;
- ::memcpy(wptr, (char*)&_enq_hdr + rec_offs, wsize);
+ ::memcpy(wptr, (const char*)_xid.c_str() + rec_offs, wsize);
wr_cnt = wsize;
rem -= wsize;
}
- rec_offs -= sizeof(_enq_hdr) - wsize2;
+ rec_offs -= _enq_hdr._xidsize - wsize2;
if (rem)
{
wsize = _data_size > rec_offs ? _data_size - rec_offs : 0;
@@ -181,15 +185,14 @@
}
else // No further split required
{
- // TODO: The header will never be split as it will always be less than one
dblk.
- // Remove this check...
- size_t wsize = sizeof(_enq_hdr) > rec_offs ? sizeof(_enq_hdr) - rec_offs :
0;
+ rec_offs -= sizeof(_enq_hdr);
+ size_t wsize = _enq_hdr._xidsize > rec_offs ? _enq_hdr._xidsize - rec_offs
: 0;
if (wsize)
{
- ::memcpy(wptr, (char*)&_enq_hdr + rec_offs, wsize);
- wr_cnt = wsize;
+ ::memcpy(wptr, _xid.c_str() + rec_offs, wsize);
+ wr_cnt += wsize;
}
- rec_offs -= sizeof(_enq_hdr) - wsize;
+ rec_offs -= _enq_hdr._xidsize - wsize;
wsize = _data_size > rec_offs ? _data_size - rec_offs : 0;
if (wsize)
{
@@ -213,14 +216,22 @@
}
else // Start at beginning of data record
{
+ // Assumption: the header will always fit into the first dblk
+ ::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr));
+ wr_cnt = sizeof(_enq_hdr);
if (size_dblks(rec_size()) > max_size_dblks) // Split required
{
- size_t wsize = rem >= sizeof(_enq_hdr) ? sizeof(_enq_hdr) : rem;
- ::memcpy(wptr, (void*)&_enq_hdr, wsize);
- wr_cnt = wsize;
- rem -= wsize;
+ size_t wsize;
+ rem -= sizeof(_enq_hdr);
if (rem)
{
+ wsize = rem >= _enq_hdr._xidsize ? _enq_hdr._xidsize : rem;
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), wsize);
+ wr_cnt += wsize;
+ rem -= wsize;
+ }
+ if (rem)
+ {
wsize = rem >= _data_size ? _data_size : rem;
::memcpy((char*)wptr + wr_cnt, _data, wsize);
wr_cnt += wsize;
@@ -237,8 +248,11 @@
}
else // No split required
{
- ::memcpy(wptr, (void*)&_enq_hdr, sizeof(_enq_hdr));
- wr_cnt = sizeof(_enq_hdr);
+ if (_enq_hdr._xidsize)
+ {
+ ::memcpy((char*)wptr + wr_cnt, _xid.c_str(), _xid.size());
+ wr_cnt += _xid.size();
+ }
::memcpy((char*)wptr + wr_cnt, _data, _data_size);
wr_cnt += _data_size;
::memcpy((char*)wptr + wr_cnt, (void*)&_enq_tail, sizeof(_enq_tail));
@@ -277,7 +291,7 @@
::memcpy((char*)_buff + data_offs, rptr, data_rem);
rd_cnt += data_rem;
::memcpy((void*)&_enq_tail, ((char*)rptr + data_rem),
sizeof(_enq_tail));
- chk_tail(_enq_tail, _enq_hdr);
+ chk_tail();
rd_cnt += sizeof(_enq_tail);
}
else
@@ -286,7 +300,7 @@
const size_t tail_offs = rec_offs - enq_hdr::size() - _data_size;
const size_t tail_rem = rec_tail::size() - tail_offs;
::memcpy((char*)&_enq_tail + tail_offs, rptr, tail_rem);
- chk_tail(_enq_tail, _enq_hdr);
+ chk_tail();
rd_cnt = tail_rem;
}
}
@@ -316,32 +330,35 @@
{
// Get and check header
_enq_hdr._hdr.copy(h);
- _enq_hdr._xidsize = *(size_t*)((char*)rptr + sizeof(hdr));
- _enq_hdr._dsize = *(size_t*)((char*)rptr + sizeof(hdr) +
+ rd_cnt = sizeof(hdr);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
- sizeof(u_int32_t) +
+ rd_cnt += sizeof(u_int32_t); // Filler 0
#endif
- sizeof(size_t)
+ _enq_hdr._xidsize = *(size_t*)((char*)rptr + rd_cnt);
+ rd_cnt += sizeof(size_t);
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
- + sizeof(u_int32_t)
+ rd_cnt += sizeof(u_int32_t); // Filler 0
#endif
- );
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ rd_cnt += sizeof(u_int32_t); // Filler 1
+#endif
+ _enq_hdr._dsize = *(size_t*)((char*)rptr + rd_cnt);
rd_cnt = _enq_hdr.size();
- chk_hdr(_enq_hdr);
+ chk_hdr();
_data_size = _enq_hdr._dsize;
- const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() + _data_size);
- const u_int32_t hdr_data_tail_dblks = size_dblks(enq_hdr::size() + _data_size +
- rec_tail::size());
if (_data_size)
{
+ const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() + _data_size);
+ const u_int32_t hdr_data_tail_dblks = size_dblks(enq_hdr::size() + _data_size
+
+ rec_tail::size());
// Check if record (header + data + tail) fits within this page, we can check
the
// tail before the expense of copying data to memory
if (hdr_data_tail_dblks <= max_size_dblks)
{
// Entire header, data and tail fits within this page
::memcpy((void*)&_enq_tail, (char*)rptr + rd_cnt + _data_size,
sizeof(_enq_tail));
- chk_tail(_enq_tail, _enq_hdr);
+ chk_tail();
::memcpy(_buff, (char*)rptr + rd_cnt, _data_size);
rd_cnt += _data_size + sizeof(_enq_tail);
}
@@ -376,6 +393,8 @@
ss << "enq_rec: m=" << _enq_hdr._hdr._magic;
ss << " v=" << (int)_enq_hdr._hdr._version;
ss << " rid=" << _enq_hdr._hdr._rid;
+ if (_xid.size())
+ ss << " xid=\"" << _xid << "\"";
ss << " len=" << _enq_hdr._dsize;
str.append(ss.str());
return str;
@@ -388,9 +407,15 @@
}
const size_t
+enq_rec::xid_size() const
+{
+ return _xid.size();
+}
+
+const size_t
enq_rec::rec_size() const
{
- return _data_size + enq_hdr::size() + rec_tail::size();
+ return enq_hdr::size() + _xid.size() + _data_size + rec_tail::size();
}
void
@@ -401,78 +426,40 @@
}
void
-enq_rec::chk_hdr(enq_hdr& hdr, u_int64_t rid, bool enq) const throw (jexception)
+enq_rec::chk_hdr() const throw (jexception)
{
- chk_hdr(hdr, enq);
- if (hdr._hdr._rid != rid)
+ jrec::chk_hdr(_enq_hdr._hdr);
+ if (_enq_hdr._hdr._magic != RHM_JDAT_ENQ_MAGIC)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid mismatch: expected=0x" << std::setw(16) <<
rid;
- ss << " read=0x" << std::setw(16) << hdr._hdr._rid;
+ ss << "enq magic: rid=0x" << std::setw(16) <<
_enq_hdr._hdr._rid;
+ ss << ": expected=0x" << std::setw(8) <<
RHM_JDAT_ENQ_MAGIC;
+ ss << " read=0x" << std::setw(2) <<
(int)_enq_hdr._hdr._magic;
throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "enq_rec",
"chk_hdr");
}
-}
-
-void
-enq_rec::chk_hdr(enq_hdr& hdr, bool enq) const throw (jexception)
-{
- if (enq)
+ if (_enq_hdr._dsize > _max_data_size)
{
- if (hdr._hdr._magic != RHM_JDAT_ENQ_MAGIC)
- {
- std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "enq magic: rid=0x" << std::setw(16) <<
hdr._hdr._rid;
- ss << ": expected=0x" << std::setw(8) <<
RHM_JDAT_ENQ_MAGIC;
- ss << " read=0x" << std::setw(2) <<
(int)hdr._hdr._magic;
- throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "enq_rec",
"chk_hdr");
- }
- }
- else
- {
- if (hdr._hdr._magic != RHM_JDAT_DEQ_MAGIC)
- {
- std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "deq magic: rid=0x" << std::setw(16) <<
hdr._hdr._rid;
- ss << ": expected=0x" << std::setw(8) <<
RHM_JDAT_DEQ_MAGIC;
- ss << " read=0x" << std::setw(2) <<
(int)hdr._hdr._magic;
- throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "enq_rec",
"chk_hdr");
- }
- }
- if (hdr._hdr._version != RHM_JDAT_VERSION)
- {
std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "version: rid=0x" << std::setw(16) <<
hdr._hdr._rid;
- ss << ": expected=0x" << std::setw(2) <<
(int)RHM_JDAT_VERSION;
- ss << " read=0x" << std::setw(2) <<
(int)hdr._hdr._version;
- throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "enq_rec",
"chk_hdr");
- }
-#if defined (JRNL_LITTLE_ENDIAN)
- u_int8_t endian_flag = RHM_LENDIAN_FLAG;
-#else
- u_int8_t endian_flag = RHM_BENDIAN_FLAG;
-#endif
- if (hdr._hdr._eflag != endian_flag)
- {
- std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "endian_flag: rid=" << std::setw(16) <<
hdr._hdr._rid;
- ss << ": expected=0x" << std::setw(2) <<
(int)endian_flag;
- ss << " read=0x" << std::setw(2) <<
(int)hdr._hdr._eflag;
- throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "enq_rec",
"chk_hdr");
- }
- if (hdr._dsize > _max_data_size)
- {
- std::stringstream ss;
- ss << std::hex << std::setfill('0');
- ss << "data_size: rid=" << std::setw(16) <<
hdr._hdr._rid;
- ss << std::dec << ": buff_size=" << _max_data_size
<< " data_size=" << hdr._dsize;
+ ss << "data_size: rid=" << std::setw(16) <<
_enq_hdr._hdr._rid;
+ ss << std::dec << ": buff_size=" << _max_data_size
<< " data_size=" << _enq_hdr._dsize;
throw jexception(jerrno::JERR_DREC_BUFFSIZE, ss.str(), "enq_rec",
"chk_hdr");
}
}
+void
+enq_rec::chk_hdr(u_int64_t rid) const throw (jexception)
+{
+ chk_hdr();
+ jrec::chk_rid(_enq_hdr._hdr, rid);
+}
+
+void
+enq_rec::chk_tail() const throw (jexception)
+{
+ jrec::chk_tail(_enq_tail, _enq_hdr._hdr);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -56,8 +56,9 @@
{
private:
enq_hdr _enq_hdr;
- const void* _data; ///< Pointer to data to be written
- void* _buff; ///< Pointer to buffer to receive read data
+ std::string _xid; ///< XID
+ const void* _data; ///< Pointer to data to be written to disk
+ void* _buff; ///< Pointer to buffer to receive data read
from disk
rec_tail _enq_tail;
size_t _max_data_size; ///< Max buffer size for decoding into during
read
size_t _data_size; ///< Size of data (bytes)
@@ -80,6 +81,10 @@
* to receive data.
*/
enq_rec(void* const buf, const size_t bufsize);
+
+ /**
+ * \brief Destructor
+ */
~enq_rec();
// Prepare instance for use in writing data to journal
@@ -96,14 +101,16 @@
inline const size_t max_rec_size() const
{ return _max_data_size + enq_hdr::size() + rec_tail::size(); }
const size_t data_size() const;
+ const size_t xid_size() const;
const size_t rec_size() const;
inline const u_int32_t rec_size_dblks() const { return size_dblks(rec_size()); }
inline const u_int64_t rid() const { return _enq_hdr._hdr._rid; }
void set_rid(const u_int64_t rid);
private:
- void chk_hdr(enq_hdr& hdr, u_int64_t rid, bool enq = true) const throw
(jexception);
- void chk_hdr(enq_hdr& hdr, bool enq = true) const throw (jexception);
+ void chk_hdr() const throw (jexception);
+ void chk_hdr(u_int64_t rid) const throw (jexception);
+ void chk_tail() const throw (jexception);
}; // class enq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -209,20 +209,20 @@
}
const iores
-jcntl::get_next_data_record(const void** const /*data*/, const size_t& /*dsize*/,
- const size_t& /*dsize_avail*/) throw (jexception)
+jcntl::get_data_record(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const
size_t& /*dsize_avail*/,
+ const void** const /*data*/, bool /*auto_discard*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
const iores
-jcntl::discard_next_data_record(data_tok* /*dtokp*/) throw (jexception)
+jcntl::discard_data_record() throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
const iores
-jcntl::read_next_data_record(void* const dbuf, const size_t dbsize, data_tok* const
dtokp)
+jcntl::read_data_record(void* const dbuf, const size_t dbsize, data_tok* const dtokp)
throw (jexception)
{
check_rstatus("read_data");
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -137,12 +137,13 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
- static const std::string no_xid;
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///<
Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
public:
+ static const std::string no_xid;
+
/**
* \brief Journal constructor.
*
@@ -281,13 +282,16 @@
*
* Example: If a write of 99 kB is divided into three equal parts, then the
following states
* and returns would characterize a successful operation:
+ * <pre>
* dtok. dtok. dtok.
- * operation return wstate() dsize() written() comment
+ * Pperation Return wstate() dsize() written() Comment
+ *
-----------------+--------+--------+-------+---------+------------------------------------
* NONE 0 0 Value of dtok before op
* edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1
* edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not
completed
* edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again
* edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3
+ * </pre>
*
* \param data_buff Pointer to data to be enqueued for this enqueue operation.
* \param tot_data_len Total data length.
@@ -306,27 +310,63 @@
/**
* \brief Retrieve details of next record to be read without consuming the
record.
*
- * Retrieve details of next record to be read without consuming the record. A
pointer to
- * the data is returned, along with the data size and available data size. If a
large record
- * should span more than one page of the read page cache, then if the following
page is still
- * waiting for AIO return, then only a portion of the total data will be available
for
- * consumption. If dsize_avail < dsize, then a subsequent call will update
dsize_avail if
- * more pages have returned from AIO.
+ * Retrieve information about current read record. A pointer to the data is
returned, along
+ * with the data size and available data size. Data is considered
"available" when the AIO
+ * operations to fill page-cache pages from disk have returned, and is ready for
consumption.
*
- * Note that using this function does not consume the record, it merely makes the
content
- * of the read page cache available through a pointer. To mark this record as
consumed,
- * discard_next_data_record() must be called.
+ * If <i>dsize_avail</i> < <i>dsize</i>, then not
all of the data is available or part of
+ * the data is in non-contiguous memory, and a subsequent call will update both
the pointer
+ * and <i>dsize_avail</i> if more pages have returned from AIO.
*
- * \param data Pointer to data pointer which will point to the first byte of the
next record
- * data.
+ * The <i>dsize_avail</i> parameter will return the amount of data
from this record that is
+ * available in the page cache as contiguous memory, even if it spans page cache
boundaries.
+ * However, if a record spans the end of the page cache and continues at the
beginning, even
+ * if both parts are ready for consumption, then this must be divided into at
least two
+ * get_data_record() operations, as the data is contained in at least two
non-contiguous
+ * segments of the page cache.
+ *
+ * Once all the available data for a record is exposed, it can not be read again
using
+ * this function. It must be consumed prior to getting the next record. This can
be done by
+ * calling discard_data_record() or read_data_record(). However, if parameter
+ * <i>auto_discard</i> is set to
<b><i>true</i></b>, then this record will be automatically
+ * consumed when the entire record has become available whithout having to
explicitly call
+ * discard_next_data_record() or read_data_record().
+ *
+ * If the current record is an open transactional record, then it cannot be read
until it is
+ * committed. If it is aborted, it can never be read. Under this condition,
get_data_record()
+ * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all
data
+ * lengths will be set to 0.
+ *
+ * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k
starting
+ * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first
15k of
+ * the record falls at the end of the page cache, the remaining 15k folded to the
beginning.
+ * The current page (page 8) containing 5k is available, the remaining pages which
contain
+ * this record are pending AIO return:
+ * <pre>
+ * call dsize
+ * no. dsize avail data ptr Return Comment
+ *
----+-----+-----+------------+--------+--------------------------------------------------
+ * 1 30k 5k base_ptr+85k SUCCESS Initial call, read first 5k
+ * 2 30k 0k base_ptr+90k AIO_WAIT AIO still pending; no further pages
avail
+ * 3 30k 10k base_ptr+90k SUCCESS AIO now returned; now read till end of
page cache
+ * 4 30k 15k base_ptr SUCCESS data_ptr now pointing to start of page
cache
+ * </pre>
+ *
+ * \param rid Reference that returns the record ID (rid)
* \param dsize Reference that returns the total data size of the record data .
* \param dsize_avail Reference that returns the amount of the data that is
available for
* consumption.
+ * \param data Pointer to data pointer which will point to the first byte of the
next record
+ * data.
+ * \param auto_discard If <b><i>true</i></b>,
automatically discard the record being read if
+ * the entire record is avialable (i.e. dsize == dsize_avail). Otherwise
+ * discard_next_data_record() must be explicitly called.
*
* \exception TODO
*/
- const iores get_next_data_record(const void** const data, const size_t&
dsize,
- const size_t& dsize_avail) throw (jexception);
+ const iores get_data_record(const u_int64_t& rid, const size_t& dsize,
+ const size_t& dsize_avail, const void** const data, bool auto_discard
= false)
+ throw (jexception);
/**
* \brief Discard (skip) next record to be read without reading or retrieving it.
@@ -335,7 +375,7 @@
*
* \exception TODO
*/
- const iores discard_next_data_record(data_tok* dtokp) throw (jexception);
+ const iores discard_data_record() throw (jexception);
/**
* \brief Reads data from the journal.
@@ -348,8 +388,8 @@
*
* \exception TODO
*/
- const iores read_next_data_record(void* const dbuf, const size_t dbsize,
- data_tok* const dtokp) throw (jexception);
+ const iores read_data_record(void* const dbuf, const size_t dbsize, data_tok*
const dtokp)
+ throw (jexception);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
Modified: store/trunk/cpp/lib/jrnl/jrec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/jrec.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -64,21 +64,70 @@
}
void
-jrec::chk_tail(rec_tail& tail, enq_hdr& hdr) const throw (jexception)
+jrec::chk_hdr(const hdr& hdr) throw (jexception)
{
- std::stringstream ss;
- if (tail._xmagic != ~hdr._hdr._magic)
+ if (hdr._magic == 0)
{
+ std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "magic: rid=0x" << std::setw(16) <<
hdr._hdr._rid;
- ss << ": expected=0x" << std::setw(8) <<
~hdr._hdr._magic;
+ ss << "enq magic NULL: rid=0x" << std::setw(16) <<
hdr._rid;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "jrec",
"chk_hdr");
+ }
+ if (hdr._version != RHM_JDAT_VERSION)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "version: rid=0x" << std::setw(16) <<
hdr._rid;
+ ss << ": expected=0x" << std::setw(2) <<
(int)RHM_JDAT_VERSION;
+ ss << " read=0x" << std::setw(2) <<
(int)hdr._version;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "jrec",
"chk_hdr");
+ }
+#if defined (JRNL_LITTLE_ENDIAN)
+ u_int8_t endian_flag = RHM_LENDIAN_FLAG;
+#else
+ u_int8_t endian_flag = RHM_BENDIAN_FLAG;
+#endif
+ if (hdr._eflag != endian_flag)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "endian_flag: rid=" << std::setw(16) <<
hdr._rid;
+ ss << ": expected=0x" << std::setw(2) <<
(int)endian_flag;
+ ss << " read=0x" << std::setw(2) << (int)hdr._eflag;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "jrec",
"chk_hdr");
+ }
+}
+
+void
+jrec::chk_rid(const hdr& hdr, const u_int64_t rid) throw (jexception)
+{
+ if (hdr._rid != rid)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "rid mismatch: expected=0x" << std::setw(16) <<
rid;
+ ss << " read=0x" << std::setw(16) << hdr._rid;
+ throw jexception(jerrno::JERR_DREC_INVRHDR, ss.str(), "jrec",
"chk_hdr");
+ }
+}
+
+void
+jrec::chk_tail(const rec_tail& tail, const hdr& hdr) throw (jexception)
+{
+ if (tail._xmagic != ~hdr._magic)
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "magic: rid=0x" << std::setw(16) << hdr._rid;
+ ss << ": expected=0x" << std::setw(8) <<
~hdr._magic;
ss << " read=0x" << std::setw(8) << tail._xmagic;
throw jexception(jerrno::JERR_JREC_INVRTAIL, ss.str(), "jrec",
"chk_tail");
}
- if (tail._rid != hdr._hdr._rid)
+ if (tail._rid != hdr._rid)
{
+ std::stringstream ss;
ss << std::hex << std::setfill('0');
- ss << "rid: rid=0x" << std::setw(16) <<
hdr._hdr._rid;
+ ss << "rid: rid=0x" << std::setw(16) << hdr._rid;
ss << ": read=0x" << std::setw(16) << tail._rid;
throw jexception(jerrno::JERR_JREC_INVRTAIL, ss.str(), "jrec",
"chk_tail");
}
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -151,15 +151,19 @@
virtual std::string& str(std::string& str) const = 0;
virtual const size_t data_size() const = 0;
+ virtual const size_t xid_size() const = 0;
virtual const size_t rec_size() const = 0;
static const u_int32_t size_dblks(const size_t size);
static const u_int32_t size_sblks(const size_t size);
static const u_int32_t size_blks(const size_t size, const size_t blksize);
protected:
- virtual void chk_hdr(enq_hdr& hdr, u_int64_t rid, bool enq = true)
- const throw (jexception) = 0;
- virtual void chk_tail(rec_tail& tail, enq_hdr& hdr) const throw
(jexception);
+ virtual void chk_hdr() const throw (jexception) = 0;
+ virtual void chk_hdr(u_int64_t rid) const throw (jexception) = 0;
+ virtual void chk_tail() const throw (jexception) = 0;
+ static void chk_hdr(const hdr& hdr) throw (jexception);
+ static void chk_rid(const hdr& hdr, u_int64_t rid) throw (jexception);
+ static void chk_tail(const rec_tail& tail, const hdr& hdr) throw
(jexception);
}; // class jrec
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -746,7 +746,7 @@
dtp->set_wstate(rhm::journal::data_tok::ENQ);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_next_data_record(buff, MAX_MSG_SIZE,
dtp), jc,
+ while (handle_jcntl_response(jc->read_data_record(buff, MAX_MSG_SIZE, dtp),
jc,
aio_sleep_cnt, dtp));
buff[dtp->dsize()] = '\0';
return buff;
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -107,7 +107,7 @@
{
if (dtokp->wstate() >= rhm::journal::data_tok::ENQ)
{
- rhm::journal::iores res = _jcntl.read_next_data_record((void*
const)_msg_buff,
+ rhm::journal::iores res = _jcntl.read_data_record((void*
const)_msg_buff,
buffSize, dtokp);
rhm::journal::data_tok::read_state rs = dtokp->rstate();
rhm::journal::data_tok::write_state ws = dtokp->wstate();
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-25 20:25:49 UTC (rev 947)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-27 15:18:31 UTC (rev 948)
@@ -81,8 +81,6 @@
{
_num_msgs = numMsgs;
_auto_dequeue = auto_dequeue;
-// for (u_int32_t i=0; i<_num_msgs; i++)
-// _dtok_master_list.push_back(new rhm::journal::data_tok);
}
void
@@ -103,15 +101,6 @@
_tot_dsize = 0;
_aio_cmpl_dtok_list.clear();
_dd_dtok_list.clear();
-// for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
-// {
-// if (_dtok_master_list[i])
-// {
-// delete _dtok_master_list[i];
-// _dtok_master_list[i] = NULL;
-// }
-// }
-// _dtok_master_list.clear();
}
u_int32_t
@@ -122,8 +111,8 @@
if (!_num_msgs)
return 0;
if (maxMsgSize > _msg_buff_size)
- throw rhm::journal::jexception(EXCEPTION_BASE+0, "Message size exceeds
internal buffer limit",
- "msg_producer", "produce");
+ throw rhm::journal::jexception(EXCEPTION_BASE+0,
+ "Message size exceeds internal buffer limit",
"msg_producer", "produce");
{
//std::cout << "[" << _num_msgs << "]" <<
std::flush;
for(u_int32_t msgCntr = 0; msgCntr < _num_msgs && !_interrupt_flag;
msgCntr++)
@@ -277,10 +266,8 @@
msg_producer::send_deferred_dequeues(rhm::journal::jcntl& jc)
{
std::deque<rhm::journal::data_tok*>::iterator ditr = _dd_dtok_list.begin();
-// while (!_dd_dtok_list.empty())
while (ditr != _dd_dtok_list.end())
{
-// rhm::journal::data_tok* ddtokp = _dd_dtok_list.front();
rhm::journal::data_tok* ddtokp = *ditr;
#ifndef RHM_WRONLY
@@ -318,7 +305,6 @@
" dres=" << iores_str[dres] <<
std::flush;
}
}
-// _dd_dtok_list.pop_front();
ditr = _dd_dtok_list.erase(ditr);
}
#ifndef RHM_WRONLY