rhmessaging commits: r957 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-10-01 10:46:01 -0400 (Mon, 01 Oct 2007)
New Revision: 957
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Added PreparedTransactionList to jcntl::recover() fns
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -304,7 +304,7 @@
try
{
- jQueue->recover(); // start recovery
+ jQueue->recover(prepared); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -95,9 +95,7 @@
}
_datafh = ::new lfh*[JRNL_NUM_FILES];
- // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
- // can be thrown during pointer initialization, and the clean() fn that will be
- // called after an exception will attempt to free any non-null pointer.
+ // NULL the pointer array first because new() can throw exceptions
::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
{
@@ -123,20 +121,20 @@
void
jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb, std::deque<data_tok*>* wr_dtokl,
- const aio_cb wr_cb) throw (jexception)
+ const aio_cb wr_cb, boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+ throw (jexception)
{
+ // Create list of prepared xids
+ std::set<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+ i != prep_tx_list.end(); i++);
+// prep_xid_list.insert(i->??);
+
// Verify journal dir and journal files
_jdir.verify_dir();
_rcvdat.reset();
_emap.clear();
rcvr_janalyze(_rcvdat);
-//std::cout << "em=" << _emap.size() << "; rd=" << rd._ffid << ":0x" << std::hex << rd._fro << std::dec << "-" << rd._lfid << ":0x" << std::hex << rd._eo << std::dec << " ";
-//std::vector<u_int64_t> kv;
-//std::vector<u_int64_t>::iterator ki;
-//_emap.rid_list(kv);
-//std::cout << " rids=[";
-//for (ki=kv.begin(); ki<kv.end(); ki++) {if (ki!=kv.begin()) std::cout << ", "; std::cout << *ki;}
-//std::cout << "]" << std::flush;
if (_datafh)
{
@@ -147,9 +145,7 @@
}
_datafh = ::new lfh*[JRNL_NUM_FILES];
- // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
- // can be thrown during pointer initialization, and the clean() fn that will be
- // called after an exception will attempt to free any non-null pointer.
+ // NULL the pointer array first because new() can throw exceptions
::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
{
@@ -162,7 +158,6 @@
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
-// _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._lfid, _rcvdat._h_rid + 1);
_wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rd_dtokl, rd_cb);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -49,6 +49,8 @@
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
#include <qpid/broker/PersistableQueue.h>
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
namespace rhm
{
@@ -221,7 +223,8 @@
* \exception TODO
*/
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
- std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (jexception);
/**
* \brief Recover using internal default callbacks and data_tok lists.
@@ -230,10 +233,11 @@
*
* \exception TODO
*/
- void recover() throw (jexception)
+ void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+ throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback );
+ &aio_wr_callback, prep_tx_list);
}
/**
@@ -379,8 +383,6 @@
/**
* \brief Discard (skip) next record to be read without reading or retrieving it.
*
- * \param dtokp Pointer to data token which contains the details of the enqueue operation.
- *
* \exception TODO
*/
const iores discard_data_record() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -32,6 +32,7 @@
#ifndef rhm_journal_rcvdat_hpp
#define rhm_journal_rcvdat_hpp
+#include <map>
#include <vector>
namespace rhm
@@ -41,6 +42,10 @@
struct rcvdat
{
+ typedef std::vector<u_int64_t> rid_list;
+ typedef std::pair<rid_list, rid_list> enq_deq_rid_list;
+ typedef std::map<std::string, enq_deq_rid_list> enq_deq_map;
+
bool _empty; ///< Journal data files empty
u_int16_t _ffid; ///< First file id
size_t _fro; ///< First record offset in ffid
@@ -48,6 +53,8 @@
size_t _eo; ///< End offset (first byte past last record)
u_int64_t _h_rid; ///< Highest rid found
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
+ enq_deq_map _edm; ///< Map of enqueue and dequeue rids for each xid
+
rcvdat():
_empty(true),
_ffid(0),
@@ -55,7 +62,8 @@
_lfid(0),
_eo(0),
_h_rid(0),
- _enq_cnt_list(JRNL_NUM_FILES)
+ _enq_cnt_list(JRNL_NUM_FILES),
+ _edm()
{}
void reset()
{
@@ -66,6 +74,7 @@
_eo=0;
_h_rid=0;
_enq_cnt_list.clear();
+ _edm.clear();
}
};
}
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 11:51:31 UTC (rev 956)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 14:46:01 UTC (rev 957)
@@ -34,6 +34,9 @@
#include "msg_consumer.hpp"
#include "jtest.hpp"
+#include <PreparedTransaction.h>
+#include <boost/ptr_container/ptr_list.hpp>
+
#define MAX_MSG_SIZE 127
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
@@ -155,6 +158,7 @@
void EmptyRecoverTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "EmptyRecoverTest_Stack";
try
@@ -165,11 +169,11 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
jc.recover_complete();
}
}
@@ -194,14 +198,14 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
delete jcp;
jcp = NULL;
}
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
jcp->recover_complete();
delete jcp;
}
@@ -257,6 +261,7 @@
void RecoverReadTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoverReadTest_Stack";
try
@@ -269,7 +274,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -298,7 +303,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -317,6 +322,7 @@
void RecoveredReadTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoveredReadTest_Stack";
try
@@ -329,7 +335,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -362,7 +368,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -385,6 +391,7 @@
void RecoveredDequeueTest()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "RecoveredDequeueTest_Stack";
try
@@ -397,7 +404,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -432,7 +439,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -457,6 +464,7 @@
void ComplexRecoveryTest1()
{
+ boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
//Stack
char* test_name = "ComplexRecoveryTest1_Stack";
try
@@ -477,7 +485,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
- jc.recover();
+ jc.recover(txn_list);
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -530,7 +538,7 @@
{
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
- jcp->recover();
+ jcp->recover(txn_list);
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
17 years, 2 months
rhmessaging commits: r956 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-10-01 07:51:31 -0400 (Mon, 01 Oct 2007)
New Revision: 956
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Fixed broken build
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 01:34:21 UTC (rev 955)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 11:51:31 UTC (rev 956)
@@ -910,7 +910,7 @@
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_data_record(ddtokp, tid);
+ dres = jc->dequeue_tx_data_record(ddtokp, tid);
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;
17 years, 2 months
rhmessaging commits: r955 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-30 21:34:21 -0400 (Sun, 30 Sep 2007)
New Revision: 955
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
store/trunk/cpp/tests/jrnl/rtest
Log:
Additional transaction interface work
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -797,7 +797,7 @@
while (!written)
{
journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data_record(buff, size, dtokp/*, txn->getXid(), false*/);
+ rhm::journal::iores eres = jc->enqueue_data_record(buff, size, size, dtokp, /*txn->getXid(),*/ false);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -193,30 +193,24 @@
const iores
-jcntl::enqueue_data_record(const void* const dbuf, const size_t dlen, data_tok* const dtok)
+jcntl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
throw (jexception)
{
- check_wstatus("enqueue_data");
- return _wmgr.enqueue(dbuf, dlen, dtok);
+ check_wstatus("enqueue_data_record");
+ return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, transient);
}
const iores
-jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
- const size_t /*this_data_len*/, data_tok* /*dtokp*/, const bool /*transient*/)
- throw (jexception)
+jcntl::enqueue_tx_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception)
{
- return RHM_IORES_NOTIMPL;
+ check_wstatus("enqueue_tx_data_record");
+ return _wmgr.enqueue_tx(data_buff, tot_data_len, this_data_len, dtokp, xid, transient);
}
const iores
-jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
- const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string& /*xid*/,
- const bool /*transient*/) throw (jexception)
-{
- return RHM_IORES_NOTIMPL;
-}
-
-const iores
jcntl::get_data_record(const u_int64_t& /*rid*/, const size_t& /*dsize*/, const size_t& /*dsize_avail*/,
const void** const /*data*/, bool /*auto_discard*/) throw (jexception)
{
@@ -245,10 +239,10 @@
}
const iores
-jcntl::dequeue_data_record(data_tok* const dtokp, const std::string& /*xid*/) throw (jexception)
+jcntl::dequeue_tx_data_record(data_tok* const dtokp, const std::string& xid) throw (jexception)
{
check_wstatus("dequeue_data");
- return _wmgr.dequeue(dtokp);
+ return _wmgr.dequeue_tx(dtokp, xid);
}
const iores
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -258,10 +258,6 @@
*/
void delete_jrnl_files() throw (jexception);
- // TODO *** DEPRECATED *** Replaced by enqueue_data_record() call below.
- const iores enqueue_data_record(const void* const dbuf, const size_t dlen,
- data_tok* const dtok) throw (jexception);
-
/**
* \brief Enqueue data.
*
@@ -315,7 +311,7 @@
*
* \exception TODO
*/
- const iores enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const iores enqueue_tx_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid,
const bool transient = false) throw (jexception);
@@ -435,7 +431,7 @@
*
* \exception TODO
*/
- const iores dequeue_data_record(data_tok* const dtokp, const std::string& xid)
+ const iores dequeue_tx_data_record(data_tok* const dtokp, const std::string& xid)
throw (jexception);
/**
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -86,24 +86,29 @@
}
const iores
-wmgr::enqueue(const void* const mbuf, const size_t dlen, data_tok* dtok) throw (jexception)
+wmgr::enqueue(const void* const data_buff, const size_t tot_data_len,
+ const size_t /*this_data_len*/, data_tok* dtokp, const bool transient)
+ throw (jexception)
{
+ if (transient)
+ return RHM_IORES_NOTIMPL;
+
if (_deq_busy)
return RHM_IORES_BUSY;
- iores res = pre_write_check(true, dtok);
+ iores res = pre_write_check(true, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
{
- if (dtok->wstate() == data_tok::ENQ_PART)
+ if (dtokp->wstate() == data_tok::ENQ_PART)
cont = true;
else
{
std::stringstream ss;
- ss << "This data_tok: id=" << dtok->id() << " state=" << dtok->wstate_str();
+ ss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_ENQDISCONT, ss.str(), "wmgr", "enqueue");
}
}
@@ -111,23 +116,23 @@
_enq_busy = true;
u_int64_t rid;
- if (dtok->getSourceMessage())
+ if (dtokp->getSourceMessage())
{
- rid = dtok->rid();
+ rid = dtokp->rid();
assert(rid != 0);
}
else
rid = cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
- _enq_rec.reset(rid, mbuf, dlen);
+ _enq_rec.reset(rid, data_buff, tot_data_len);
if (!cont)
- dtok->set_rid(rid);
+ dtokp->set_rid(rid);
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
- u_int32_t data_offs_dblks = dtok->dblocks_written();
+ u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
@@ -139,19 +144,19 @@
#endif
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
- dtok->incr_dblocks_written(ret);
+ dtokp->incr_dblocks_written(ret);
// Is the encoding of this record complete?
- if (dtok->dblocks_written() >= _enq_rec.rec_size_dblks())
+ if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
- dtok->set_wstate(data_tok::ENQ_SUBM);
- dtok->set_dsize(dlen);
+ dtokp->set_wstate(data_tok::ENQ_SUBM);
+ dtokp->set_dsize(tot_data_len);
// Only add this data token to page token list when submit is complete, this way
// long multi-page messages have their token on the page containing the END of the
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
- _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
done = true;
}
@@ -181,7 +186,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
res = RHM_IORES_AIO_WAIT;
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
@@ -198,81 +203,89 @@
else
{
// Set last data_tok in page only to state ENQ_PART
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
}
}
}
}
- if (dtok->wstate() >= data_tok::ENQ_SUBM)
+ if (dtokp->wstate() >= data_tok::ENQ_SUBM)
_enq_busy = false;
return res;
}
const iores
-wmgr::dequeue(data_tok* dtok) throw (jexception)
+wmgr::enqueue_tx(const void* const /*data_buff*/, const size_t /*tot_data_len*/,
+ const size_t /*this_data_len*/, data_tok* /*dtokp*/, const std::string& /*xid*/,
+ const bool /*transient*/) throw (jexception)
{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+wmgr::dequeue(data_tok* dtokp) throw (jexception)
+{
if (_enq_busy)
return RHM_IORES_BUSY;
- iores res = pre_write_check(false, dtok);
+ iores res = pre_write_check(false, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_AIO_WAIT
{
- if (dtok->wstate() == data_tok::DEQ_PART)
+ if (dtokp->wstate() == data_tok::DEQ_PART)
cont = true;
else
{
std::stringstream ss;
- ss << "This data_tok: id=" << dtok->id() << " state=" << dtok->wstate_str();
+ ss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, ss.str(), "wmgr", "dequeue");
}
}
else
{
_deq_busy = true;
- dtok->set_dblocks_written(0); // Reset dblks_written from enqueue op
+ dtokp->set_dblocks_written(0); // Reset dblks_written from enqueue op
}
u_int64_t rid;
- if (dtok->getSourceMessage())
+ if (dtokp->getSourceMessage())
{
- rid = dtok->dequeue_rid();
+ rid = dtokp->dequeue_rid();
assert(rid != 0);
}
else
rid = _wrfc.get_incr_rid();
- _deq_rec.reset(rid, dtok->rid());
+ _deq_rec.reset(rid, dtokp->rid());
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
- u_int32_t data_offs_dblks = dtok->dblocks_written();
+ u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
if (data_offs_dblks == 0)
{
- u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+ u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
_wrfc.decr_enqcnt(fid);
}
#endif
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
- dtok->incr_dblocks_written(ret);
+ dtokp->incr_dblocks_written(ret);
// Is the encoding of this record complete?
- if (dtok->dblocks_written() >= _deq_rec.rec_size_dblks())
+ if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
- dtok->set_wstate(data_tok::DEQ_SUBM);
- _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+ dtokp->set_wstate(data_tok::DEQ_SUBM);
+ _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
done = true;
}
@@ -302,7 +315,7 @@
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
res = RHM_IORES_AIO_WAIT;
- dtok->set_wstate(data_tok::DEQ_PART);
+ dtokp->set_wstate(data_tok::DEQ_PART);
done = true;
}
@@ -319,25 +332,25 @@
else
{
// Set last data_tok in page only to state ENQ_PART
- dtok->set_wstate(data_tok::ENQ_PART);
+ dtokp->set_wstate(data_tok::ENQ_PART);
done = true;
}
}
}
}
}
- if (dtok->wstate() >= data_tok::DEQ_SUBM)
+ if (dtokp->wstate() >= data_tok::DEQ_SUBM)
_deq_busy = false;
-// iores res = pre_write_check(false, dtok);
+// iores res = pre_write_check(false, dtokp);
// if (res != RHM_IORES_SUCCESS)
// return res;
//
// u_int64_t rid;
//
-// if (dtok->getSourceMessage())
+// if (dtokp->getSourceMessage())
// {
-// rid = dtok->dequeue_rid();
+// rid = dtokp->dequeue_rid();
// assert(rid != 0);
// }
// else
@@ -350,21 +363,21 @@
// // (currently dblk = 128 bytes). JRNL_DBLK_SIZE must be a power of 2.
// // IF JRNL_DBLK_SIZE IS SET TO < 32 (i.e. 16 OR LESS) BYTES, THIS ENCODING WILL FAIL!
// //***
-// deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtok->rid(), 0, RHM_JDAT_VERSION);
+// deq_hdr dhdr(RHM_JDAT_DEQ_MAGIC, rid, dtokp->rid(), 0, RHM_JDAT_VERSION);
// void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
// ::memcpy(wptr, &dhdr, sizeof(dhdr));
// #ifdef RHM_CLEAN
// ::memset((char*)wptr + sizeof(dhdr), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(dhdr));
// #endif
// #if !(defined RHM_WRONLY || defined RHM_RDONLY)
-// u_int16_t fid = _emap.get_remove_fid(dtok->rid());
+// u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
// _wrfc.decr_enqcnt(fid);
// #endif
// _pg_offset_dblks++;
// _cached_offset_dblks++;
// // TODO: Incorrect - must set state to DEQ_CACHED; DEQ_SUBM is set when AIO returns.
-// dtok->set_wstate(data_tok::DEQ_SUBM);
-// _page_cb_arr[_pg_index]._pdtokl->push_back(dtok);
+// dtokp->set_wstate(data_tok::DEQ_SUBM);
+// _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// if (_wrfc.empty()) // Has the file_hdr been written?
// write_fhdr(rid, _wrfc.index(), JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
// if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
@@ -374,6 +387,12 @@
}
const iores
+wmgr::dequeue_tx(data_tok* /*dtokp*/, const std::string& /*xid*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
wmgr::flush()
{
iores res = write_flush();
@@ -549,7 +568,7 @@
}
const iores
-wmgr::pre_write_check(bool enqueue, data_tok* dtok) throw (jexception)
+wmgr::pre_write_check(bool enqueue, data_tok* dtokp) throw (jexception)
{
// Check status of current file
if (!_wrfc.is_reset())
@@ -575,12 +594,12 @@
}
// Check state of data_tok
- bool res = enqueue ? dtok->is_writable() : dtok->is_dequeueable();
+ bool res = enqueue ? dtokp->is_writable() : dtokp->is_dequeueable();
if (!res)
{
std::stringstream ss;
- ss << "op=" << (enqueue ? "enqueue" : "dequeue") << " dtok_id=" << dtok->id();
- ss << " dtok_state=" << dtok->wstate_str();
+ ss << "op=" << (enqueue ? "enqueue" : "dequeue") << " dtok_id=" << dtokp->id();
+ ss << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr", "pre_write_check");
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -87,15 +87,20 @@
void initialize(std::deque<data_tok*>* const dtokl, aio_cb wr_cb,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception);
- const iores enqueue(const void* const mbuf, const size_t mlen, data_tok* dtok)
+ const iores enqueue(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const bool transient)
throw (jexception);
- const iores dequeue(data_tok* dtok) throw (jexception);
+ const iores enqueue_tx(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid,
+ const bool transient) throw (jexception);
+ const iores dequeue(data_tok* dtokp) throw (jexception);
+ const iores dequeue_tx(data_tok* dtokp, const std::string& xid) throw (jexception);
const iores flush();
const u_int32_t get_events(page_state state) throw (jexception);
private:
void initialize() throw (jexception);
- const iores pre_write_check(bool enqueue, data_tok* dtok) throw (jexception);
+ const iores pre_write_check(bool enqueue, data_tok* dtokp) throw (jexception);
const iores write_flush();
const iores rotate_file();
void dblk_roundup();
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -77,7 +77,7 @@
CPPUNIT_TEST(EncodeTest_024);
CPPUNIT_TEST(EncodeTest_025);
CPPUNIT_TEST(EncodeTest_026);
- CPPUNIT_TEST(EncodeTest_027);
+// CPPUNIT_TEST(EncodeTest_027); // Until race condition fixed
// CPPUNIT_TEST(EncodeTest_028); // Until race condition fixed
CPPUNIT_TEST_SUITE_END();
@@ -721,8 +721,8 @@
CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), dtp), jc,
- aio_sleep_cnt, dtp));
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(), msg.size(),
+ dtp, false), jc, aio_sleep_cnt, dtp));
}
void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-10-01 01:34:21 UTC (rev 955)
@@ -131,7 +131,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores eres = jc.enqueue_data_record(msg, size, dtokp);
+ rhm::journal::iores eres = jc.enqueue_data_record(msg, size, size, dtokp, false);
rhm::journal::data_tok::write_state ws = dtokp->wstate();
const char* wsstr = dtokp->wstate_str();
switch (eres)
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-09-30 14:33:44 UTC (rev 954)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-10-01 01:34:21 UTC (rev 955)
@@ -29,8 +29,8 @@
TEMP_V_FILE=/tmp/v.txt
NUM_JFILES=8
-VG_NORM_FILESIZE=11
-#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+#VG_NORM_FILESIZE=11
+VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
# Write test
W_DO_TEST=T
17 years, 2 months