Author: kpvdr
Date: 2007-09-30 21:34:21 -0400 (Sun, 30 Sep 2007)
New Revision: 955
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
store/trunk/cpp/tests/jrnl/rtest
Log:
Additional transaction interface work
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -797,7 +797,7 @@
while (!written)
{
journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data_record(buff, size, dtokp/*,
txn->getXid(), false*/);
+ rhm::journal::iores eres = jc->enqueue_data_record(buff, size, size, dtokp,
/*txn->getXid(),*/ false);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -193,30 +193,24 @@
const iores
-jcntl::enqueue_data_record(const void* const dbuf, const size_t dlen, data_tok* const
dtok)
+jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
throw (jexception)
{
- check_wstatus("enqueue_data");
- return _wmgr.enqueue(dbuf, dlen, dtok);
+ check_wstatus("enqueue_data_record");
+ return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, transient);
}
const iores
-jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t
/*tot_data_len*/,
- const size_t /*this_data_len*/, data_tok* /*dtokp*/, const bool /*transient*/)
- throw (jexception)
+jcntl::enqueue_tx_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
- return RHM_IORES_NOTIMPL;
+ check_wstatus("enqueue_tx_data_record");
+ return _wmgr.enqueue_tx(data_buff, tot_data_len, this_data_len, dtokp, xid,
transient);
}
const iores
-jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t
/*tot_data_len*/,
- const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string&
/*xid*/,
- const bool /*transient*/) throw (jexception)
-{
- return RHM_IORES_NOTIMPL;
-}
-
-const iores
jcntl::get_data_record(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const
size_t& /*dsize_avail*/,
const void** const /*data*/, bool /*auto_discard*/) throw (jexception)
{
@@ -245,10 +239,10 @@
}
const iores
-jcntl::dequeue_data_record(data_tok* const dtokp, const std::string& /*xid*/) throw
(jexception)
+jcntl::dequeue_tx_data_record(data_tok* const dtokp, const std::string& xid) throw
(jexception)
{
check_wstatus("dequeue_data");
- return _wmgr.dequeue(dtokp);
+ return _wmgr.dequeue_tx(dtokp, xid);
}
const iores
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -258,10 +258,6 @@
*/
void delete_jrnl_files() throw (jexception);
- // TODO *** DEPRECATED *** Replaced by enqueue_data_record() call below.
- const iores enqueue_data_record(const void* const dbuf, const size_t dlen,
- data_tok* const dtok) throw (jexception);
-
/**
* \brief Enqueue data.
*
@@ -315,7 +311,7 @@
*
* \exception TODO
*/
- const iores enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
+ const iores enqueue_tx_data_record(const void* const data_buff, const size_t
tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
const bool transient = false) throw (jexception);
@@ -435,7 +431,7 @@
*
* \exception TODO
*/
- const iores dequeue_data_record(data_tok* const dtokp, const std::string&
xid)
+ const iores dequeue_tx_data_record(data_tok* const dtokp, const std::string&
xid)
throw (jexception);
/**
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -86,24 +86,29 @@
}
const iores
-wmgr::enqueue(const void* const mbuf, const size_t dlen, data_tok* dtok) throw
(jexception)
+wmgr::enqueue(const void* const data_buff, const size_t tot_data_len,
+ const size_t /*this_data_len*/, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
+ if (transient)
+ return RHM_IORES_NOTIMPL;
+
if (_deq_busy)
return RHM_IORES_BUSY;
- iores res = pre_write_check(true, dtok);
+ iores res = pre_write_check(true, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
{
- if (dtok->wstate() == data_tok::ENQ_PART)
+ if (dtokp->wstate() == data_tok::ENQ_PART)
cont = true;
else
{
std::stringstream ss;
- ss << "This data_tok: id=" << dtok->id() <<
" state=" << dtok->wstate_str();
+ ss << "This data_tok: id=" << dtokp->id() <<
" state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_ENQDISCONT, ss.str(), "wmgr",
"enqueue");
}
}
@@ -111,23 +116,23 @@
_enq_busy = true;
u_int64_t rid;
- if (dtok->getSourceMessage())
+ if (dtokp->getSourceMessage())
{
- rid = dtok->rid();
+ rid = dtokp->rid();
assert(rid != 0);
}
else
rid = cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
- _enq_rec.reset(rid, mbuf, dlen);
+ _enq_rec.reset(rid, data_buff, tot_data_len);
if (!cont)
- dtok->set_rid(rid);
+ dtokp->set_rid(rid);
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
- u_int32_t data_offs_dblks = dtok->dblocks_written();
+ u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
@@ -139,19 +144,19 @@
#endif
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
- dtok->incr_dblocks_written(ret);
+ dtokp->incr_dblocks_written(ret);
// Is the encoding of this record complete?
- if (dtok->dblocks_written() >= _enq_rec.rec_size_dblks())
+ if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO
returns.
- dtok->set_wstate(data_tok::ENQ_SUBM);
- dtok->set_dsize(dlen);
+ dtokp->set_wstate(data_tok::ENQ_SUBM);
+ dtokp->set_dsize(tot_data_len);
// Only add this data token to page token list when submit is complete, this
way
// long multi-page messages have their token on the page containing the END
of the
// message. AIO callbacks will then only process this token when entire
message is
// enqueued.
- _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
done = true;
}
@@ -181,7 +186,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
res = RHM_IORES_AIO_WAIT;
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
@@ -198,81 +203,89 @@
else
{
// Set last data_tok in page only to state ENQ_PART
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
}
}
}
}
- if (dtok->wstate() >= data_tok::ENQ_SUBM)
+ if (dtokp->wstate() >= data_tok::ENQ_SUBM)
_enq_busy = false;
return res;
}
const iores
-wmgr::dequeue(data_tok* dtok) throw (jexception)
+wmgr::enqueue_tx(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
+ const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string&
/*xid*/,
+ const bool /*transient*/) throw (jexception)
{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+wmgr::dequeue(data_tok* dtokp) throw (jexception)
+{
if (_enq_busy)
return RHM_IORES_BUSY;
- iores res = pre_write_check(false, dtok);
+ iores res = pre_write_check(false, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or
RHM_IORES_AIO_WAIT
{
- if (dtok->wstate() == data_tok::DEQ_PART)
+ if (dtokp->wstate() == data_tok::DEQ_PART)
cont = true;
else
{
std::stringstream ss;
- ss << "This data_tok: id=" << dtok->id() <<
" state=" << dtok->wstate_str();
+ ss << "This data_tok: id=" << dtokp->id() <<
" state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, ss.str(), "wmgr",
"dequeue");
}
}
else
{
_deq_busy = true;
- dtok->set_dblocks_written(0); // Reset dblks_written from enqueue op
+ dtokp->set_dblocks_written(0); // Reset dblks_written from enqueue op
}
u_int64_t rid;
- if (dtok->getSourceMessage())
+ if (dtokp->getSourceMessage())
{
- rid = dtok->dequeue_rid();
+ rid = dtokp->dequeue_rid();
assert(rid != 0);
}
else
rid = _wrfc.get_incr_rid();
- _deq_rec.reset(rid, dtok->rid());
+ _deq_rec.reset(rid, dtokp->rid());
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
- u_int32_t data_offs_dblks = dtok->dblocks_written();
+ u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
if (data_offs_dblks == 0)
{
- u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+ u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
_wrfc.decr_enqcnt(fid);
}
#endif
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
- dtok->incr_dblocks_written(ret);
+ dtokp->incr_dblocks_written(ret);
// Is the encoding of this record complete?
- if (dtok->dblocks_written() >= _deq_rec.rec_size_dblks())
+ if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO
returns.
- dtok->set_wstate(data_tok::DEQ_SUBM);
- _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+ dtokp->set_wstate(data_tok::DEQ_SUBM);
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
done = true;
}
@@ -302,7 +315,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
res = RHM_IORES_AIO_WAIT;
- dtok->set_wstate(data_tok::DEQ_PART);
+ dtokp->set_wstate(data_tok::DEQ_PART);
done = true;
}
@@ -319,25 +332,25 @@
else
{
// Set last data_tok in page only to state ENQ_PART
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
}
}
}
}
- if (dtok->wstate() >= data_tok::DEQ_SUBM)
+ if (dtokp->wstate() >= data_tok::DEQ_SUBM)
_deq_busy = false;
-// iores res = pre_write_check(false, dtok);
+// iores res = pre_write_check(false, dtokp);
// if (res != RHM_IORES_SUCCESS)
// return res;
//
// u_int64_t rid;
//
-// if (dtok->getSourceMessage())
+// if (dtokp->getSourceMessage())
// {
-// rid = dtok->dequeue_rid();
+// rid = dtokp->dequeue_rid();
// assert(rid != 0);
// }
// else
@@ -350,21 +363,21 @@
// // (currently dblk = 128 bytes). JRNL_DBLK_SIZE must be a power of 2.
// // IF JRNL_DBLK_SIZE IS SET TO < 32 (i.e. 16 OR LESS) BYTES, THIS ENCODING WILL
FAIL!
// //***
-// deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtok->rid(), 0, RHM_JDAT_VERSION);
+// deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtokp->rid(), 0, RHM_JDAT_VERSION);
// void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks *
JRNL_DBLK_SIZE);
// ::memcpy(wptr, &dhdr, sizeof(dhdr));
// #ifdef RHM_CLEAN
// ::memset((char*)wptr + sizeof(dhdr), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE -
sizeof(dhdr));
// #endif
// #if !(defined RHM_WRONLY || defined RHM_RDONLY)
-// u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+// u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
// _wrfc.decr_enqcnt(fid);
// #endif
// _pg_offset_dblks++;
// _cached_offset_dblks++;
// // TODO: Incorrect - must set state to DEQ_CACHED; DEQ_SUBM is set when AIO
returns.
-// dtok->set_wstate(data_tok::DEQ_SUBM);
-// _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+// dtokp->set_wstate(data_tok::DEQ_SUBM);
+// _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// if (_wrfc.empty()) // Has the file_hdr been written?
// write_fhdr(rid, _wrfc.index(), JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
// if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
@@ -374,6 +387,12 @@
}
const iores
+wmgr::dequeue_tx(data_tok* /*dtokp*/, const std::string& /*xid*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
wmgr::flush()
{
iores res = write_flush();
@@ -549,7 +568,7 @@
}
const iores
-wmgr::pre_write_check(bool enqueue, data_tok* dtok) throw (jexception)
+wmgr::pre_write_check(bool enqueue, data_tok* dtokp) throw (jexception)
{
// Check status of current file
if (!_wrfc.is_reset())
@@ -575,12 +594,12 @@
}
// Check state of data_tok
- bool res = enqueue ? dtok->is_writable() : dtok->is_dequeueable();
+ bool res = enqueue ? dtokp->is_writable() : dtokp->is_dequeueable();
if (!res)
{
std::stringstream ss;
- ss << "op=" << (enqueue ? "enqueue" :
"dequeue") << " dtok_id=" << dtok->id();
- ss << " dtok_state=" << dtok->wstate_str();
+ ss << "op=" << (enqueue ? "enqueue" :
"dequeue") << " dtok_id=" << dtokp->id();
+ ss << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr",
"pre_write_check");
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -87,15 +87,20 @@
void initialize(std::deque<data_tok*>* const dtokl, aio_cb wr_cb,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw
(jexception);
- const iores enqueue(const void* const mbuf, const size_t mlen, data_tok* dtok)
+ const iores enqueue(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
throw (jexception);
- const iores dequeue(data_tok* dtok) throw (jexception);
+ const iores enqueue_tx(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception);
+ const iores dequeue(data_tok* dtokp) throw (jexception);
+ const iores dequeue_tx(data_tok* dtokp, const std::string& xid) throw
(jexception);
const iores flush();
const u_int32_t get_events(page_state state) throw (jexception);
private:
void initialize() throw (jexception);
- const iores pre_write_check(bool enqueue, data_tok* dtok) throw (jexception);
+ const iores pre_write_check(bool enqueue, data_tok* dtokp) throw (jexception);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -77,7 +77,7 @@
CPPUNIT_TEST(EncodeTest_024);
CPPUNIT_TEST(EncodeTest_025);
CPPUNIT_TEST(EncodeTest_026);
- CPPUNIT_TEST(EncodeTest_027);
+// CPPUNIT_TEST(EncodeTest_027); // Until race condition fixed
// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
CPPUNIT_TEST_SUITE_END();
@@ -721,8 +721,8 @@
CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(),
dtp), jc,
- aio_sleep_cnt, dtp));
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(),
msg.size(),
+ dtp, false), jc, aio_sleep_cnt, dtp));
}
void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -131,7 +131,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores eres = jc.enqueue_data_record(msg, size, dtokp);
+ rhm::journal::iores eres = jc.enqueue_data_record(msg, size, size, dtokp,
false);
rhm::journal::data_tok::write_state ws = dtokp->wstate();
const char* wsstr = dtokp->wstate_str();
switch (eres)
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-10-01 01:34:21 UTC (rev 955)
@@ -29,8 +29,8 @@
TEMP_V_FILE=/tmp/v.txt
NUM_JFILES=8
-VG_NORM_FILESIZE=11
-#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+#VG_NORM_FILESIZE=11
+VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
# Write test
W_DO_TEST=T