Author: kpvdr
Date: 2007-09-27 15:29:25 -0400 (Thu, 27 Sep 2007)
New Revision: 949
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/dtx_rec.cpp
store/trunk/cpp/lib/jrnl/dtx_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/persistence.py
Log:
Corrections to class jcntl transactions API; new class deq_rec.encode() now in use.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -87,6 +87,8 @@
return "ENQ";
case DEQ_CACHED:
return "DEQ_CACHED";
+ case DEQ_PART:
+ return "DEQ_PART";
case DEQ_SUBM:
return "DEQ_SUBM";
case DEQ:
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -75,6 +75,7 @@
ENQ_SUBM, ///< Data block enqueue submitted to AIO
ENQ, ///< Data block enqueue AIO write complete (enqueue complete)
DEQ_CACHED, ///< Data block dequeue written to page cache
+ DEQ_PART, ///< Data block part-submitted to AIO, waiting for page buffer
to free up
DEQ_SUBM, ///< Data block dequeue submitted to AIO
DEQ ///< Data block dequeue AIO write complete (dequeue complete)
};
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -43,12 +43,18 @@
{
deq_rec::deq_rec():
- _deq_hdr(),
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, 0, 0, 0, RHM_JDAT_VERSION),
_xid(),
_deq_tail()
{}
-deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string xid):
+deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, rid, drid, 0, RHM_JDAT_VERSION),
+ _xid(),
+ _deq_tail(_deq_hdr._hdr)
+{}
+
+deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string& xid):
_deq_hdr(RHM_JDAT_DEQ_MAGIC, rid, drid, xid.size(), RHM_JDAT_VERSION),
_xid(xid),
_deq_tail(_deq_hdr._hdr)
@@ -57,6 +63,24 @@
deq_rec::~deq_rec()
{}
+void
+deq_rec::reset(const u_int64_t rid, const u_int64_t drid)
+{
+ _deq_hdr._hdr._rid = rid;
+ _deq_hdr._deq_rid = drid;
+ _deq_hdr._xidsize = 0;
+ _deq_tail._rid = rid;
+}
+
+void
+deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const std::string& xid)
+{
+ _deq_hdr._hdr._rid = rid;
+ _deq_hdr._deq_rid = drid;
+ _deq_hdr._xidsize = xid.size();
+ _deq_tail._rid = rid;
+}
+
u_int32_t
deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw
(jexception)
{
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -61,9 +61,12 @@
public:
deq_rec();
- deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string xid);
+ deq_rec(const u_int64_t rid, const u_int64_t drid);
+ deq_rec(const u_int64_t rid, const u_int64_t drid, const std::string& xid);
~deq_rec();
+ void reset(const u_int64_t rid, const u_int64_t drid);
+ void reset(const u_int64_t rid, const u_int64_t drid, const std::string&
xid);
u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception);
u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
@@ -74,9 +77,9 @@
const size_t rec_size() const;
private:
- virtual void chk_hdr() const throw (jexception) = 0;
- virtual void chk_hdr(u_int64_t rid) const throw (jexception) = 0;
- virtual void chk_tail() const throw (jexception) = 0;
+ virtual void chk_hdr() const throw (jexception);
+ virtual void chk_hdr(u_int64_t rid) const throw (jexception);
+ virtual void chk_tail() const throw (jexception);
}; // class deq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/dtx_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/dtx_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/dtx_rec.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -48,7 +48,13 @@
_dtx_tail()
{}
-dtx_rec::dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string xid):
+dtx_rec::dtx_rec(const u_int32_t magic, const u_int64_t rid):
+ _dtx_hdr(magic, rid, 0, RHM_JDAT_VERSION),
+ _xid(),
+ _dtx_tail(_dtx_hdr._hdr)
+{}
+
+dtx_rec::dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string&
xid):
_dtx_hdr(magic, rid, xid.size(), RHM_JDAT_VERSION),
_xid(xid),
_dtx_tail(_dtx_hdr._hdr)
@@ -57,6 +63,22 @@
dtx_rec::~dtx_rec()
{}
+void
+dtx_rec::reset(const u_int64_t rid)
+{
+ _dtx_hdr._hdr._rid = rid;
+ _dtx_hdr._xidsize = 0;
+ _dtx_tail._rid = rid;
+}
+
+void
+dtx_rec::reset(const u_int64_t rid, const std::string& xid)
+{
+ _dtx_hdr._hdr._rid = rid;
+ _dtx_hdr._xidsize = xid.size();
+ _dtx_tail._rid = rid;
+}
+
u_int32_t
dtx_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw
(jexception)
{
Modified: store/trunk/cpp/lib/jrnl/dtx_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/dtx_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/dtx_rec.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -61,9 +61,12 @@
public:
dtx_rec();
- dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string xid);
+ dtx_rec(const u_int32_t magic, const u_int64_t rid);
+ dtx_rec(const u_int32_t magic, const u_int64_t rid, const std::string& xid);
~dtx_rec();
+ void reset(const u_int64_t rid);
+ void reset(const u_int64_t rid, const std::string& xid);
u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
throw (jexception);
u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t
max_size_dblks)
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -70,6 +70,21 @@
JRNL_DBLK_SIZE)
{}
+// Constructor used for transactional write operations, where dbuf contains data to be
written.
+enq_rec::enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen,
+ const std::string& xid):
+ jrec(), // superclass
+ _enq_hdr(RHM_JDAT_ENQ_MAGIC, rid, xid.size(), dlen, RHM_JDAT_VERSION),
+ _xid(xid),
+ _data(dbuf),
+ _buff(NULL),
+ _enq_tail(_enq_hdr._hdr),
+ _max_data_size(0),
+ _data_size(dlen),
+ _rec_size(size_dblks(enq_hdr::size() + _enq_hdr._xidsize + dlen +
rec_tail::size()) *
+ JRNL_DBLK_SIZE)
+{}
+
// Constructor used for read operations, where buf contains preallocated space
// to receive data.
enq_rec::enq_rec(void* const buf, const size_t bufsize):
@@ -110,6 +125,23 @@
_rec_size = size_dblks(dlen + enq_hdr::size() + rec_tail::size()) * JRNL_DBLK_SIZE;
}
+// Prepare instance for use in writing transactional data to journal, where dbuf contains
data to
+// be written.
+void
+enq_rec::reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
+ const std::string& xid)
+{
+ _enq_hdr._hdr._rid = rid;
+ _enq_hdr._xidsize = xid.size();
+ _enq_hdr._dsize = dlen;
+ _data = dbuf;
+ _buff = NULL;
+ _enq_tail._rid = rid;
+ _max_data_size = 0;
+ _data_size = dlen;
+ _rec_size = size_dblks(dlen + enq_hdr::size() + rec_tail::size()) * JRNL_DBLK_SIZE;
+}
+
// Prepare instance for use in reading data from journal, where buf contains preallocated
space
// to receive data.
void
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -67,7 +67,7 @@
public:
/**
* \brief Default constructor; must be used in conjunction with reset() to
prepare
- * instance for use with write or read operations.
+ * instance for use with write or read operations.
*/
enq_rec();
@@ -77,8 +77,15 @@
enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen);
/**
+ * \brief Constructor used for transactional write operations, where mbuf contains
data to
+ * be written.
+ */
+ enq_rec(const u_int64_t rid, const void* const dbuf, const size_t dlen,
+ const std::string& xid);
+
+ /**
* \brief Constructor used for read operations, where buf contains preallocated
space
- * to receive data.
+ * to receive data.
*/
enq_rec(void* const buf, const size_t bufsize);
@@ -89,6 +96,8 @@
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const void* const dbuf, const size_t dlen);
+ void reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
+ const std::string& xid);
// Prepare instance for use in reading data from journal
void reset(void* const buf, const size_t bufsize);
@@ -103,7 +112,6 @@
const size_t data_size() const;
const size_t xid_size() const;
const size_t rec_size() const;
- inline const u_int32_t rec_size_dblks() const { return size_dblks(rec_size()); }
inline const u_int64_t rid() const { return _enq_hdr._hdr._rid; }
void set_rid(const u_int64_t rid);
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -202,7 +202,15 @@
const iores
jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t
/*tot_data_len*/,
- const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string /*xid*/,
+ const size_t /*this_data_len*/, data_tok* /*dtokp*/, const bool /*transient*/)
+ throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t
/*tot_data_len*/,
+ const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string&
/*xid*/,
const bool /*transient*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
@@ -230,20 +238,27 @@
}
const iores
-jcntl::dequeue_data_record(data_tok* const dtokp, const std::string /*xid*/) throw
(jexception)
+jcntl::dequeue_data_record(data_tok* const dtokp) throw (jexception)
{
check_wstatus("dequeue_data");
return _wmgr.dequeue(dtokp);
}
const iores
-jcntl::abort_dtx(const std::string /*xid*/) throw (jexception)
+jcntl::dequeue_data_record(data_tok* const dtokp, const std::string& /*xid*/) throw
(jexception)
{
+ check_wstatus("dequeue_data");
+ return _wmgr.dequeue(dtokp);
+}
+
+const iores
+jcntl::abort_dtx(const std::string& /*xid*/) throw (jexception)
+{
return RHM_IORES_NOTIMPL;
}
const iores
-jcntl::commit_dtx(const std::string /*xid*/) throw (jexception)
+jcntl::commit_dtx(const std::string& /*xid*/) throw (jexception)
{
return RHM_IORES_NOTIMPL;
}
@@ -545,9 +560,5 @@
}
-// Static instance of empty string used as default XID parameter and which signifies no
XID.
-const std::string jcntl::no_xid;
-
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -142,8 +142,6 @@
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
public:
- static const std::string no_xid;
-
/**
* \brief Journal constructor.
*
@@ -304,7 +302,10 @@
* \exception TODO
*/
const iores enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string xid =
no_xid,
+ const size_t this_data_len, data_tok* dtokp, const bool transient =
false)
+ throw (jexception);
+ const iores enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
const bool transient = false) throw (jexception);
/**
@@ -407,7 +408,8 @@
*
* \exception TODO
*/
- const iores dequeue_data_record(data_tok* const dtokp, const std::string xid =
no_xid)
+ const iores dequeue_data_record(data_tok* const dtokp) throw (jexception);
+ const iores dequeue_data_record(data_tok* const dtokp, const std::string&
xid)
throw (jexception);
/**
@@ -421,7 +423,7 @@
*
* \exception TODO
*/
- const iores abort_dtx(const std::string xid) throw (jexception);
+ const iores abort_dtx(const std::string& xid) throw (jexception);
/**
* \brief Commit the transaction for all records enqueued or dequeued with the
matching xid.
@@ -434,7 +436,7 @@
*
* \exception TODO
*/
- const iores commit_dtx(const std::string xid) throw (jexception);
+ const iores commit_dtx(const std::string& xid) throw (jexception);
/**
* \brief Check whether all the enqueue records for the given xid have reached
disk.
@@ -443,7 +445,7 @@
*
* \exception TODO
*/
- const bool is_dtx_synced(const std::string xid) throw (jexception);
+ const bool is_dtx_synced(const std::string& xid) throw (jexception);
/**
* \brief Forces a check for returned AIO write events.
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -93,6 +93,7 @@
const u_int32_t jerrno::JERR_WMGR_BADPGSTATE = 0x0801;
const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE = 0x0802;
const u_int32_t jerrno::JERR_WMGR_ENQDISCONT = 0x0803;
+const u_int32_t jerrno::JERR_WMGR_DEQDISCONT = 0x0804; ///< Deq. new dtok when
previous part compl.
// class rmgr
const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
@@ -182,6 +183,8 @@
"Data token in illegal state for operation.");
_err_map[JERR_WMGR_ENQDISCONT] = std::string("JERR_WMGR_ENQDISCONT: "
"Enqueued new dtok when previous enqueue returned partly completed
(state ENQ_PART).");
+ _err_map[JERR_WMGR_DEQDISCONT] = std::string("JERR_WMGR_DEQDISCONT: "
+ "Dequeued new dtok when previous dequeue returned partly completed
(state DEQ_PART).");
// class rmgr
_err_map[JERR_RMGR_UNKNOWNMAGIC] = std::string("JERR_RMGR_UNKNOWNMAGIC: "
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -110,6 +110,7 @@
static const u_int32_t JERR_WMGR_BADPGSTATE; ///< Page buffer in illegal
state.
static const u_int32_t JERR_WMGR_BADDTOKSTATE; ///< Data token in illegal
state.
static const u_int32_t JERR_WMGR_ENQDISCONT; ///< Enq. new dtok when
previous part compl.
+ static const u_int32_t JERR_WMGR_DEQDISCONT; ///< Deq. new dtok when
previous part compl.
// class rmgr
static const u_int32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown
magic
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -153,6 +153,7 @@
virtual const size_t data_size() const = 0;
virtual const size_t xid_size() const = 0;
virtual const size_t rec_size() const = 0;
+ inline virtual const u_int32_t rec_size_dblks() const { return
size_dblks(rec_size()); }
static const u_int32_t size_dblks(const size_t size);
static const u_int32_t size_sblks(const size_t size);
static const u_int32_t size_blks(const size_t size, const size_t blksize);
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -45,9 +45,11 @@
#include <deque>
#include <libaio.h>
#include <jrnl/aio_cb.hpp>
-#include <jrnl/enq_rec.hpp>
#include <jrnl/data_tok.hpp>
+#include <jrnl/deq_rec.hpp>
+#include <jrnl/dtx_rec.hpp>
#include <jrnl/enq_map.hpp>
+#include <jrnl/enq_rec.hpp>
#include <jrnl/nlfh.hpp>
namespace rhm
@@ -125,7 +127,11 @@
u_int32_t _pg_cntr; ///< Page counter; determines if file rotation
req'd
u_int32_t _pg_offset_dblks; ///< Page offset (used so far) in data blocks
u_int32_t _aio_evt_rem; ///< Remaining AIO events
- enq_rec _enq_rec; ///< Data record into/from which data is
encoded/decoded
+
+ enq_rec _enq_rec; ///< Enqueue record used for
encoding/decoding
+ deq_rec _deq_rec; ///< Dequeue record used for
encoding/decoding
+ dtx_rec _dtx_rec; ///< Transaction record used for
encoding/decoding
+
// TODO: move _cb down to wmgr, it is the only class that uses it There is no
need for
// read callbacks based on AIO. - (check this asertion)
aio_cb _cb; ///< Callback function pointer for AIO events
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -290,11 +290,11 @@
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
-//std::cout << " U=" <<
page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
+//std::cout << " S=" <<
page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " V" << dtokp->dblocks_read() <<
"," << dblks_rem() << std::flush;
+//std::cout << " T" << dtokp->dblocks_read() <<
"," << dblks_rem() << std::flush;
// Read data from this page, first block will have header and data size.
u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(),
dblks_rem());
@@ -305,7 +305,7 @@
// If data still incomplete, move to next page and decode again
while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
{
-//std::cout << " W" << std::flush;
+//std::cout << " U" << std::flush;
rotate_page();
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -322,7 +322,7 @@
// If we have finished with this page, rotate it
if (dblks_rem() == 0)
-//{std::cout << " X" << std::flush;
+//{std::cout << " V" << std::flush;
rotate_page();
//}
@@ -332,10 +332,58 @@
return RHM_IORES_SUCCESS;
}
+const iores
+rmgr::read_deq(hdr& h, void* rptr, data_tok* dtokp) throw (jexception)
+{
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+//std::cout << " W=" <<
page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
+ aio_cycle(); // check if any AIOs have returned
+ return RHM_IORES_AIO_WAIT;
+ }
+//std::cout << " X" << dtokp->dblocks_read() <<
"," << dblks_rem() << std::flush;
+
+ // Read data from this page, first block will have header and data size.
+ u_int32_t dblks_rd = _deq_rec.decode(h, rptr, dtokp->dblocks_read(),
dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
+
+ _pg_offset_dblks += dblks_rd;
+
+ // If data still incomplete, move to next page and decode again
+ // TODO
+ while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
+ {
+//std::cout << " Y" << std::flush;
+ rotate_page();
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ dtokp->set_rstate(data_tok::READ_PART);
+ dtokp->set_dsize(_enq_rec.data_size());
+ return RHM_IORES_AIO_WAIT;
+ }
+
+ rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
+ dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
+ _pg_offset_dblks += dblks_rd;
+ }
+
+ // If we have finished with this page, rotate it
+ if (dblks_rem() == 0)
+//{std::cout << " Z" << std::flush;
+ rotate_page();
+//}
+
+ // Set the record size in dtokp
+ dtokp->set_rstate(data_tok::READ);
+ dtokp->set_dsize(_enq_rec.data_size());
+ return RHM_IORES_SUCCESS;
+}
+
+// *** DEPRECATED ***, to be removed, use read_deq()
void
rmgr::consume_deq() throw (jexception)
{
- // TODO: Check: Assumption - dequeue record fills one dblk, but dblk size is
tunable.
_pg_offset_dblks++;
if (dblks_rem() == 0)
rotate_page();
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -75,7 +75,8 @@
private:
void initialize() throw (jexception);
const iores read_enq(hdr& h, void* rptr, data_tok* dtok) throw (jexception);
- void consume_deq() throw (jexception);
+ const iores read_deq(hdr& h, void* rptr, data_tok* dtok) throw (jexception);
+ void consume_deq() throw (jexception); // DEPRECATED, to be removed, use
read_deq()
void consume_filler() throw (jexception);
const iores skip(data_tok* dtokp) throw (jexception);
void aio_cycle() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -49,7 +49,8 @@
_max_io_wait_us(0),
_fhdr_buff(NULL),
_cached_offset_dblks(0),
- _enq_busy(false)
+ _enq_busy(false),
+ _deq_busy(false)
{}
wmgr::wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc, std::deque<data_tok*>*
const dtokl,
@@ -60,7 +61,8 @@
_max_io_wait_us(max_iowait_us),
_fhdr_buff(NULL),
_cached_offset_dblks(0),
- _enq_busy(false)
+ _enq_busy(false),
+ _deq_busy(false)
{}
wmgr::~wmgr()
@@ -86,6 +88,9 @@
const iores
wmgr::enqueue(const void* const mbuf, const size_t dlen, data_tok* dtok) throw
(jexception)
{
+ if (_deq_busy)
+ return RHM_IORES_BUSY;
+
iores res = pre_write_check(true, dtok);
if (res != RHM_IORES_SUCCESS)
return res;
@@ -215,6 +220,24 @@
if (res != RHM_IORES_SUCCESS)
return res;
+ bool cont = false;
+ if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
+ {
+ if (dtok->wstate() == data_tok::DEQ_PART)
+ cont = true;
+ else
+ {
+ std::stringstream ss;
+ ss << "This data_tok: id=" << dtok->id() <<
" state=" << dtok->wstate_str();
+ throw jexception(jerrno::JERR_WMGR_DEQDISCONT, ss.str(), "wmgr",
"dequeue");
+ }
+ }
+ else
+ {
+ _deq_busy = true;
+ dtok->set_dblocks_written(0); // Reset dblks_written from enqueue op
+ }
+
u_int64_t rid;
if (dtok->getSourceMessage())
{
@@ -224,34 +247,129 @@
else
rid = _wrfc.get_incr_rid();
-
- //***
- // NOTE: ASSUMPTION: sizeof(deq_hdr) <= JRNL_DBLK_SIZE
- // This encoding is a simplification: it assumes deq_hdr (currently 20 bytes) fits
inside
- // one dblk. The dblk is a tunable parameter, but is unlikely to go lower than
deq_hdr
- // (currently dblk = 128 bytes). JRNL_DBLK_SIZE must be a power of 2.
- // IF JRNL_DBLK_SIZE IS SET TO < 32 (i.e. 16 OR LESS) BYTES, THIS ENCODING WILL
FAIL!
- //***
- deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtok->rid(), 0, RHM_JDAT_VERSION);
- void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
- ::memcpy(wptr, &dhdr, sizeof(dhdr));
-#ifdef RHM_CLEAN
- ::memset((char*)wptr + sizeof(dhdr), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(dhdr));
+ _deq_rec.reset(rid, dtok->rid());
+ bool done = false;
+ while (!done)
+ {
+ assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
+ void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
+ u_int32_t data_offs_dblks = dtok->dblocks_written();
+ u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
+ (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+ if (data_offs_dblks == 0)
+ {
+ u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+ _wrfc.decr_enqcnt(fid);
+ }
#endif
-#if !(defined RHM_WRONLY || defined RHM_RDONLY)
- u_int16_t fid = _emap.get_remove_fid(dtok->rid());
- _wrfc.decr_enqcnt(fid);
-#endif
- _pg_offset_dblks++;
- _cached_offset_dblks++;
- // TODO: Incorrect - must set state to DEQ_CACHED; DEQ_SUBM is set when AIO returns.
- dtok->set_wstate(data_tok::DEQ_SUBM);
- _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
- if (_wrfc.empty()) // Has the file_hdr been written?
- write_fhdr(rid, _wrfc.index(), JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
- if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
- res = flush();
+ _pg_offset_dblks += ret;
+ _cached_offset_dblks += ret;
+ dtok->incr_dblocks_written(ret);
+ // Is the encoding of this record complete?
+ if (dtok->dblocks_written() >= _deq_rec.rec_size_dblks())
+ {
+ // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO
returns.
+ dtok->set_wstate(data_tok::DEQ_SUBM);
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+ done = true;
+ }
+
+ // Has the file header been written (i.e. write pointers still at 0)?
+ if (_wrfc.empty())
+ {
+ u_int32_t rec_dblks_rem = _enq_rec.rec_size_dblks() - data_offs_dblks;
+ bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ size_t fro = 0;
+ if (cont)
+ {
+ if (file_fit && !file_full)
+ fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE;
+ }
+ else
+ fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
+ write_fhdr(rid, _wrfc.index(), fro);
+ }
+
+ // Is the page full? If so, flush.
+ if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+ {
+ res = write_flush();
+ assert(res == RHM_IORES_SUCCESS);
+
+ if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
+ {
+ res = RHM_IORES_AIO_WAIT;
+ dtok->set_wstate(data_tok::DEQ_PART);
+ done = true;
+ }
+
+ // File full?
+ if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ {
+ iores rfres = rotate_file();
+ if (rfres != RHM_IORES_SUCCESS)
+ res = rfres;
+ if (!done)
+ {
+ if (rfres == RHM_IORES_SUCCESS)
+ cont = true;
+ else
+ {
+ // Set last data_tok in page only to state ENQ_PART
+ dtok->set_wstate(data_tok::ENQ_PART);
+ done = true;
+ }
+ }
+ }
+ }
+ }
+ if (dtok->wstate() >= data_tok::DEQ_SUBM)
+ _deq_busy = false;
+
+// iores res = pre_write_check(false, dtok);
+// if (res != RHM_IORES_SUCCESS)
+// return res;
+//
+// u_int64_t rid;
+//
+// if (dtok->getSourceMessage())
+// {
+// rid = dtok->dequeue_rid();
+// assert(rid != 0);
+// }
+// else
+// rid = _wrfc.get_incr_rid();
+//
+// //***
+// // NOTE: ASSUMPTION: sizeof(deq_hdr) <= JRNL_DBLK_SIZE
+// // This encoding is a simplification: it assumes deq_hdr (currently 20 bytes) fits
inside
+// // one dblk. The dblk is a tunable parameter, but is unlikely to go lower than
deq_hdr
+// // (currently dblk = 128 bytes). JRNL_DBLK_SIZE must be a power of 2.
+// // IF JRNL_DBLK_SIZE IS SET TO < 32 (i.e. 16 OR LESS) BYTES, THIS ENCODING WILL
FAIL!
+// //***
+// deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtok->rid(), 0, RHM_JDAT_VERSION);
+// void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
+// ::memcpy(wptr, &dhdr, sizeof(dhdr));
+// #ifdef RHM_CLEAN
+// ::memset((char*)wptr + sizeof(dhdr), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE -
sizeof(dhdr));
+// #endif
+// #if !(defined RHM_WRONLY || defined RHM_RDONLY)
+// u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+// _wrfc.decr_enqcnt(fid);
+// #endif
+// _pg_offset_dblks++;
+// _cached_offset_dblks++;
+// // TODO: Incorrect - must set state to DEQ_CACHED; DEQ_SUBM is set when AIO
returns.
+// dtok->set_wstate(data_tok::DEQ_SUBM);
+// _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+// if (_wrfc.empty()) // Has the file_hdr been written?
+// write_fhdr(rid, _wrfc.index(), JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
+// if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
+// res = flush();
+
return res;
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-27 19:29:25 UTC (rev 949)
@@ -77,6 +77,7 @@
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
// TODO: Convert _enq_busy into a proper threadsafe lock
bool _enq_busy; ///< Flag true if enqueue is in progress
+ bool _deq_busy; ///< Flag true if dequeue is in progress
public:
wmgr(jcntl* jc, enq_map& emap, wrfc& wrfc);
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-09-27 15:18:31 UTC (rev 948)
+++ store/trunk/cpp/tests/persistence.py 2007-09-27 19:29:25 UTC (rev 949)
@@ -354,6 +354,12 @@
self.channel.session_close()
+
+ # Crude fix to wait for thread in client to exit after return from
session_close()
+ # Reduces occurrences of "Unhandled exception in thread" messages
after each test
+ import time
+ time.sleep(1)
+
return True