Author: kpvdr
Date: 2007-11-14 17:20:42 -0500 (Wed, 14 Nov 2007)
New Revision: 1315
Modified:
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/jtest.cpp
store/trunk/cpp/tests/jrnl/jtest.hpp
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.hpp
Log:
Fixed some bugs in test framewrok
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -216,7 +216,7 @@
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t&
xidsize, bool& transient,
bool& external, data_tok* dtokp) throw (jexception)
{
-//std::cout << " rmgr::read() ro=" <<
(_jc->is_read_only()?"T":"F") << " po=" <<
_pg_offset_dblks << " ems=" << _emap.size() << std::flush;
+//std::cout << " rmgr::read() ro=" <<
(_jc->is_read_only()?"T":"F") << " po=" <<
_pg_offset_dblks << " ems=" << _emap.size() << "
tms=" << _tmap.size() << std::flush;
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-11-14 22:20:42 UTC (rev 1315)
@@ -77,7 +77,7 @@
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
-CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual
-Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers
-pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
+CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual
-Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers
-pedantic -g -O0 -pthread $(CXXINCLUDES)
LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L$(QPID_HOME_DIR)/cpp/src/.libs
.SUFFIXES:
Modified: store/trunk/cpp/tests/jrnl/jtest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.cpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/jtest.cpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -217,6 +217,9 @@
gettimeofday(_end_time, _tz_ptr);
string str;
cout << "run(): " << report_time(str) << endl;
+ double rate = c_args->_num_msgs / time_diff(_start_time, _end_time) / 1000.0;
+ cout.precision(3);
+ cout << "rate = " << fixed << rate << "
kmsgs/sec" << endl;
#endif
}
@@ -250,11 +253,16 @@
_dtok_master_list.clear();
}
+double
+jtest::time_diff(const struct timeval* start, const struct timeval* end)
+{
+ return end->tv_sec - start->tv_sec + (end->tv_usec -
start->tv_usec)/1e6;
+}
+
string&
jtest::report_time(string& str) const
{
- double diff = _end_time->tv_sec - _start_time->tv_sec +
- (_end_time->tv_usec - _start_time->tv_usec)/1e6;
+ double diff = time_diff(_start_time, _end_time);
stringstream ss;
ss.precision(6);
ss << fixed << diff << " sec";
Modified: store/trunk/cpp/tests/jrnl/jtest.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.hpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/jtest.hpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -100,6 +100,7 @@
static msg_consumer _mc;
private:
+ static double time_diff(const struct timeval* start, const struct timeval* end);
string& report_time(string& str) const;
// AIO callback functions
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -290,10 +290,15 @@
}
if (chk_xid_data)
{
+ int xid_num_size = xid_size - 5 > 16 ? 16 : xid_size - 5;
+ std::stringstream xid;
+ xid << "xid:" << std::hex <<
std::setfill('0') << std::setw(xid_num_size) << msgCntr <<
":";
+ for (unsigned i = xid_num_size + 5; i < xid_size; i++)
+ xid << (char)('a' + (i%26));
for (unsigned i=0; i<xid_size; i++)
{
char found = ((char*)xidp)[i];
- char expected = 'a' + (((msgCntr%26) + i) % 26);
+ char expected = xid.str().at(i);
if (found != expected)
{
std::stringstream ss;
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -67,7 +67,6 @@
{
instance_cnt++;
init_msg_buff();
- init_xid_buff();
_aio_cmpl_dtok_list.clear();
_dd_dtok_list.clear();
}
@@ -117,9 +116,6 @@
if (maxMsgSize > _msg_buff_size)
throw rhm::journal::jexception(EXCEPTION_BASE+0,
"Message size exceeds internal buffer limit",
"msg_producer", "produce");
- if (maxXidSize > _xid_buff_size)
- throw rhm::journal::jexception(EXCEPTION_BASE+1,
- "XID size exceeds internal buffer limit",
"msg_producer", "produce");
{
//std::cout << "[" << _num_msgs << "]" <<
std::flush;
@@ -133,7 +129,12 @@
size_t sizeRange = maxXidSize - minXidSize + 1;
xid_size = minXidSize + (int)(1.0*rand()*sizeRange/(RAND_MAX + 1.0));
}
- const std::string xid((char*)_xid_buff + (msgCntr%26), xid_size);
+ int xid_num_size = xid_size - 5 > 16 ? 16 : xid_size - 5;
+ std::stringstream xid;
+ xid << "xid:" << std::hex <<
std::setfill('0');
+ xid << std::setw(xid_num_size) << msgCntr <<
":";
+ for (unsigned i = xid_num_size + 5; i < xid_size; i++)
+ xid << (char)('a' + (i%26));
size_t size = maxMsgSize;
if (minMsgSize < maxMsgSize)
{
@@ -150,9 +151,10 @@
if (maxXidSize)
{
if (external)
- eres = jc.enqueue_extern_txn_data_record(size, dtokp, xid,
transient);
+ eres = jc.enqueue_extern_txn_data_record(size, dtokp, xid.str(),
transient);
else
- eres = jc.enqueue_txn_data_record(msg, size, size, dtokp, xid,
transient);
+ eres = jc.enqueue_txn_data_record(msg, size, size, dtokp,
xid.str(),
+ transient);
}
else
{
@@ -220,6 +222,46 @@
//print_dbug(msgCntr, size, (char*)msg, true);
}
+ // If transactional, write commit
+ if (xid_size)
+ {
+ written = false;
+ // Create new data_tok, push to back of maseter list
+ rhm::journal::data_tok* txn_dtokp = new rhm::journal::data_tok;
+ _dtok_master_list.push_back(txn_dtokp);
+//std::cout << " T" << dtokp->id() << " "
<< std::flush;
+ while (!written)
+ {
+ rhm::journal::iores dres;
+ dres = _jcptr->txn_commit(txn_dtokp, dtokp->xid());
+
+ const char* wsstr = dtokp->wstate_str();
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ written = true;
+ _num_txn_subm++;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
+ throw rhm::journal::jexception(EXCEPTION_BASE+8,
+ "Page cache full (AIO events outstanding for
all pages); "
+ "exceeded wait time for pages to
free.", "msg_producer",
+ "send_deferred_dequeues");
+ jc.get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ aio_sleep_cnt++;
+ break;
+ default:
+ std::stringstream ss;
+ ss << "msg_producer::send_deferred_dequeues()
"
+ "Unexpected msg state: id=" <<
dtokp->id() << " ws=" <<
+ wsstr << " res=" <<
iores_str[dres];
+ throw rhm::journal::jexception(EXCEPTION_BASE+9,
ss.str().c_str(),
+ "msg_producer",
"send_deferred_dequeues");
+ }
+ }
+ }
// Submit deferred dequeues (if any)
if (_auto_dequeue)
@@ -272,21 +314,21 @@
if (st == rhm::journal::data_tok::ENQ)
{
_num_msgs_enq++;
-//std::cout << ">E" << dtokp->id() << " "
<< std::flush;
+//std::cout << " >E" << dtokp->id() << " "
<< std::flush;
_dd_dtok_list.push_back(dtokp);
}
else if (dtokp->wstate() == rhm::journal::data_tok::DEQ)
{
-//std::cout << ">D" << dtokp->id() << " "
<< std::flush;
+//std::cout << " >D" << dtokp->id() << " "
<< std::flush;
_num_msgs_deq++;
}
else if (dtokp->wstate() == rhm::journal::data_tok::ABORTED ||
dtokp->wstate() == rhm::journal::data_tok::COMMITTED)
{
-//std::cout << ">T" << dtokp->id() << " "
<< std::flush;
+//std::cout << " >T" << dtokp->id() << " "
<< std::flush;
_num_txn++;
}
-//else std::cout << ">?" << dtokp->id() << "
st=" << dtokp->wstate_str() << " " << std::flush;
+//else std::cout << " >?" << dtokp->id() << "
st=" << dtokp->wstate_str() << " " << std::flush;
this_dtok_list.pop_front();
}
}
@@ -305,13 +347,6 @@
}
void
-msg_producer::init_xid_buff()
-{
- for (unsigned int i=0; i<_xid_buff_size; i++)
- _xid_buff[i] = 'a' + i%26;
-}
-
-void
msg_producer::send_deferred_dequeues(rhm::journal::jcntl& jc)
{
for (unsigned i=0; i<_dd_dtok_list.size(); i++)
@@ -325,86 +360,47 @@
{
//std::cout << " D" << ddtokp->id() << " "
<< std::flush;
_dd_dtok_list.erase(_dd_dtok_list.begin() + i--);
- unsigned aio_sleep_cnt = 0;
- bool written = false;
- while (!written)
+ unsigned aio_sleep_cnt;
+ bool written;
+ if (_auto_dequeue)
{
- rhm::journal::iores dres;
- if (ddtokp->has_xid())
- dres = _jcptr->dequeue_txn_data_record(ddtokp, ddtokp->xid());
- else
- dres = _jcptr->dequeue_data_record(ddtokp);
-
- const char* wsstr = ddtokp->wstate_str();
- switch (dres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- written = true;
- _num_msgs_deq_subm++;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- throw rhm::journal::jexception(EXCEPTION_BASE+6,
- "Page cache full (AIO events outstanding for all
pages); "
- "exceeded wait time for pages to free.",
"msg_producer",
- "send_deferred_dequeues");
-//std::cout << "$" << dres << " " <<
std::flush;
- jc.get_wr_events();
- usleep(AIO_SLEEP_TIME);
- aio_sleep_cnt++;
- break;
- default:
- std::stringstream ss;
- ss << "msg_producer::send_deferred_dequeues()
Unexpected msg state: id=" <<
- ddtokp->id() << " ws=" << wsstr
<< " res=" << iores_str[dres];
- throw rhm::journal::jexception(EXCEPTION_BASE+7,
ss.str().c_str(),
- "msg_producer",
"send_deferred_dequeues");
- }
- }
-
- // If transactional, commit for even rids, abort for odd rids
- if (ddtokp->has_xid())
- {
+ aio_sleep_cnt = 0;
written = false;
- // Create new data_tok, push to back of maseter list
- rhm::journal::data_tok* txn_dtokp = new rhm::journal::data_tok;
- _dtok_master_list.push_back(txn_dtokp);
-//std::cout << " T" << ddtokp->id() << " "
<< std::flush;
while (!written)
{
rhm::journal::iores dres;
- if (ddtokp->rid()%2)
- dres = _jcptr->txn_abort(txn_dtokp, ddtokp->xid());
+ if (ddtokp->has_xid())
+ dres = _jcptr->dequeue_txn_data_record(ddtokp,
ddtokp->xid());
else
- dres = _jcptr->txn_commit(txn_dtokp, ddtokp->xid());
-
+ dres = _jcptr->dequeue_data_record(ddtokp);
+
const char* wsstr = ddtokp->wstate_str();
switch (dres)
{
case rhm::journal::RHM_IORES_SUCCESS:
written = true;
- _num_txn_subm++;
+ _num_msgs_deq_subm++;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- throw rhm::journal::jexception(EXCEPTION_BASE+8,
+ throw rhm::journal::jexception(EXCEPTION_BASE+6,
"Page cache full (AIO events outstanding for
all pages); "
"exceeded wait time for pages to
free.", "msg_producer",
"send_deferred_dequeues");
+//std::cout << "$" << dres << " " <<
std::flush;
jc.get_wr_events();
usleep(AIO_SLEEP_TIME);
aio_sleep_cnt++;
break;
default:
std::stringstream ss;
- ss << "msg_producer::send_deferred_dequeues()
"
- "Unexpected msg state: id=" <<
ddtokp->id() << " ws=" <<
- wsstr << " res=" <<
iores_str[dres];
- throw rhm::journal::jexception(EXCEPTION_BASE+9,
ss.str().c_str(),
+ ss << "Unexpected msg state: id=" <<
ddtokp->id();
+ ss << " ws=" << wsstr << "
res=" << iores_str[dres];
+ throw rhm::journal::jexception(EXCEPTION_BASE+7,
ss.str().c_str(),
"msg_producer",
"send_deferred_dequeues");
- }
- }
- }
+ } // switch (dres)
+ } // while (!written)
+ } // if (_auto_dequeue)
}
}
}
@@ -412,6 +408,7 @@
void
msg_producer::jrnl_flush(rhm::journal::jcntl& jc, u_int32_t num_msgs_sent)
{
+std::cout << "msg_producer::jrnl_flush(" << num_msgs_sent <<
")" << std::endl << std::flush;
// Clear any unsent enqueues in buffer, then wait for all enqueues to return
unsigned flush_cnt = 0;
assert(num_msgs_sent == _num_msgs_enq_subm);
@@ -428,7 +425,7 @@
throw rhm::journal::jexception(EXCEPTION_BASE+10, ss.str().c_str(),
"msg_producer",
"jrnl_flush");
}
-//std::cout << "+" << std::flush;
+std::cout << "+" << std::flush;
jc.get_wr_events();
usleep(FLUSH_SLEEPTIME);
flush_cnt++;
@@ -438,7 +435,7 @@
// Submit deferred dequeues (if any)
if (_auto_dequeue)
{
-//std::cout << "auto_dequeue: " << _dd_dtok_list.size() <<
" dequeues waiting. " << std::flush;
+std::cout << "auto_dequeue: " << _dd_dtok_list.size() <<
" dequeues waiting. " << std::flush;
u_int32_t cnt = 0;
send_deferred_dequeues(jc);
while (!_dd_dtok_list.empty() && !_interrupt_flag)
@@ -450,10 +447,11 @@
usleep(1000);
send_deferred_dequeues(jc);
}
-//std::cout << "auto_dequeue: done. " << std::flush;
+std::cout << "auto_dequeue: done. " << std::flush;
// Clear any unsent dequeues in buffer, then wait for all dequeues to return
flush_cnt = 0;
+std::cout << "_num_msgs_deq_subm=" << _num_msgs_deq_subm <<
std::endl << std::flush;
assert(num_msgs_sent == _num_msgs_deq_subm);
if (_num_msgs_deq < _num_msgs_deq_subm || _num_txn < _num_txn_subm)
{
@@ -469,7 +467,7 @@
throw rhm::journal::jexception(EXCEPTION_BASE+12, ss.str().c_str(),
"msg_producer", "jrnl_flush");
}
-//std::cout << "*" << std::flush;
+std::cout << "*" << std::flush;
jc.get_wr_events();
usleep(1000);
flush_cnt++;
@@ -522,8 +520,6 @@
u_int16_t msg_producer::instance_cnt = 0;
const size_t msg_producer::_msg_buff_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE *
JRNL_FILE_SIZE * JRNL_NUM_FILES;
-const size_t msg_producer::_xid_buff_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE *
JRNL_FILE_SIZE * JRNL_NUM_FILES;
char msg_producer::_msg_buff[msg_producer::_msg_buff_size];
-char msg_producer::_xid_buff[msg_producer::_xid_buff_size];
bool msg_producer::_interrupt_flag = false;
Modified: store/trunk/cpp/tests/jrnl/msg_producer.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-11-14 20:35:07 UTC (rev 1314)
+++ store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-11-14 22:20:42 UTC (rev 1315)
@@ -66,8 +66,6 @@
static u_int16_t instance_cnt;
static const size_t _msg_buff_size;
static char _msg_buff[];
- static const size_t _xid_buff_size;
- static char _xid_buff[];
static bool _interrupt_flag;
rhm::journal::jcntl* _jcptr;
u_int32_t _num_msgs;
@@ -106,7 +104,6 @@
private:
void init_msg_buff();
- void init_xid_buff();
void send_deferred_dequeues(rhm::journal::jcntl& jc);
void jrnl_flush(rhm::journal::jcntl& jc, u_int32_t num_msgs_sent);
void print_dbug(u_int32_t msg_cnt, size_t msg_size, char* _msg_buff, bool show_msg =
false) const;