Author: kpvdr
Date: 2007-09-21 16:43:29 -0400 (Fri, 21 Sep 2007)
New Revision: 939
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
Log:
Solved rmgr::skip() bugs which caused JERR_RMGR_UNKNOWNMAGIC and other errors.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -45,8 +45,6 @@
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
- // FIXME: Make this thread safe!
- _icnt(_cnt++),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
@@ -54,10 +52,17 @@
_dblks_read(0),
_rid(0),
_sourceMsg(NULL)
-{}
+{
+ pthread_mutex_init(&_mutex, NULL);
+ pthread_mutex_lock(&_mutex);
+ _icnt = _cnt++;
+ pthread_mutex_unlock(&_mutex);
+}
data_tok::~data_tok()
-{}
+{
+ pthread_mutex_destroy(&_mutex);
+}
const char*
data_tok::wstate_str() const
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -49,6 +49,7 @@
}
}
+#include <pthread.h>
#include <sys/types.h>
#include <jrnl/jexception.hpp>
@@ -87,6 +88,7 @@
};
private:
+ pthread_mutex_t _mutex;
static u_int64_t _cnt;
u_int64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -71,11 +71,12 @@
//std::cout << " rmgr::read()" << std::flush;
if (_aio_evt_rem)
get_events();
-//std::cout << " [a pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << "]" << std::flush;
+//std::cout << " [a pi=" << _pg_index << " d="
<< dblks_rem() << " c=" <<
(_rrfc.is_compl()?"T":"F") << " wc=" <<
(_rrfc.is_wr_compl()?"T":"F") << "]" <<
std::flush;
// if(dblks_rem() == 0 && _rrfc.is_full())
- if(dblks_rem() == 0 && _rrfc.is_compl())
+// if(dblks_rem() == 0 && _rrfc.is_compl())
+ if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
{
- aio_cycle(); // check if any AIOs have returned
+// aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
}
//std::cout << " b" << std::flush;
@@ -111,7 +112,8 @@
{
//std::cout << " [f pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << "]" << std::flush;
// if(dblks_rem() == 0 && _rrfc.is_full())
- if(dblks_rem() == 0 && _rrfc.is_compl())
+// if(dblks_rem() == 0 && _rrfc.is_compl())
+ if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
@@ -190,6 +192,10 @@
{
//std::cout << " %" << std::flush;
// skip this record, it is already dequeued
+ u_int64_t* sizep = (u_int64_t*)((char*)rptr + sizeof(hdr));
+//u_int32_t dblks = jrec::size_dblks((size_t)*sizep + sizeof(enq_hdr) +
sizeof(enq_tail));
+//std::cout << "{" << *sizep << "=" << dblks
<< "d}" << std::flush;
+ dtokp->set_dsize((size_t)*sizep);
skip(dtokp);
}
break;
@@ -287,7 +293,7 @@
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " V" << std::flush;
+//std::cout << " V" << dtokp->dblocks_read() <<
"," << dblks_rem() << std::flush;
// Read data from this page, first block will have header and data size.
u_int32_t dblks_rd = _data_rec.decode(h, rptr, dtokp->dblocks_read(),
dblks_rem());
@@ -346,18 +352,19 @@
const iores
rmgr::skip(data_tok* dtokp) throw (jexception)
{
+ u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize() + sizeof(enq_hdr) +
sizeof(enq_tail));
u_int32_t tot_dblk_cnt = dtokp->rstate() == data_tok::SKIP_PART ?
dtokp->dblocks_read() : 0;
//std::cout << " S" << tot_dblk_cnt << std::flush;
while (true)
{
u_int32_t this_dblk_cnt = 0;
- if (_data_rec.rec_size_dblks() - this_dblk_cnt > dblks_rem())
+ if (dsize_dblks - tot_dblk_cnt > dblks_rem())
//{std::cout << "-1" << std::flush;
this_dblk_cnt = dblks_rem();
//}
else
//{std::cout << "-2" << std::flush;
- this_dblk_cnt = _data_rec.rec_size_dblks() - this_dblk_cnt;
+ this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
//}
//std::cout << "->" << this_dblk_cnt << std::flush;
dtokp->incr_dblocks_read(this_dblk_cnt);
@@ -380,6 +387,8 @@
{
// Skip complete, put state back to unread
dtokp->set_rstate(data_tok::UNREAD);
+ dtokp->set_dsize(0);
+ dtokp->set_dblocks_read(0);
return RHM_IORES_SUCCESS;
}
}
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -124,6 +124,7 @@
inline const u_int32_t aio_outstanding_dblks()
{ return _curr_fh->rd_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
+ inline const bool is_wr_compl() const { return _curr_fh->is_wr_compl(); }
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 20:43:29 UTC (rev 939)
@@ -104,6 +104,7 @@
jtest: $(JRNL_OBJ_FILES) $(JTEST_OBJ_FILES)
+jrnl_scope_test: RHM_DEFINES = -DRHM_CLEAN
jrnl_scope_test: $(JRNL_OBJ_FILES) jrnl_scope_test.cpp
jrtest: RHM_DEFINES = -DRHM_JOWRITE -DRHM_RDONLY -DRHM_TESTVALS
Modified: store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-21 19:12:19 UTC (rev 938)
+++ store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-21 20:43:29 UTC (rev 939)
@@ -6,16 +6,18 @@
using namespace std;
#define TEST_ITERATIONS 5
-#define NUM_TESTS 14
+#define NUM_TESTS 16
#define NUM_MSGS 5
+#define MAX_MSG_SIZE 127
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000
class jrnl_scope_test
{
static const char* iores_str[];
+ std::string _msg;
public:
- jrnl_scope_test() {}
+ jrnl_scope_test() {_msg.reserve(MAX_MSG_SIZE+1);}
~jrnl_scope_test() {}
int run()
@@ -61,6 +63,10 @@
return test_13(num_iter);
case 14:
return test_14(num_iter);
+ case 15:
+ return test_15(num_iter);
+ case 16:
+ return test_16(num_iter);
default:
cout << " unknown test: " << test_num <<
endl;
return 1;
@@ -216,12 +222,8 @@
rhm::journal::jcntl jc("scope_test_07", "jdata",
"t07");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
}
cout << " ok" << endl;
@@ -243,15 +245,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
}
@@ -268,19 +266,15 @@
{
rhm::journal::jcntl jc("scope_test_09", "jdata",
"t09");
jc.initialize();
- for (int m=0; m<NUM_MSGS; m++) // 12288
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ for (int m=0; m<NUM_MSGS; m++) // 12288 - fills 1 file
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_09", "jdata",
"t09");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
}
}
@@ -303,15 +297,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -323,7 +313,7 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -345,22 +335,18 @@
rhm::journal::jcntl jc("scope_test_11", "jdata",
"t11");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_11", "jdata",
"t11");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
jc.recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
}
}
@@ -383,15 +369,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -403,14 +385,14 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
}
jc->recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -432,22 +414,18 @@
rhm::journal::jcntl jc("scope_test_13", "jdata",
"t13");
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(&jc, create_msg(_msg, m)))
return 1;
- }
}
{
rhm::journal::jcntl jc("scope_test_13", "jdata",
"t13");
jc.recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
jc.recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(&jc))
+ if (read_msg(m, &jc))
return 1;
for (int m=0; m<NUM_MSGS; m++)
if (deq_msg(&jc, m))
@@ -473,15 +451,11 @@
}
jc->initialize();
for (int m=0; m<NUM_MSGS; m++)
- {
- std::stringstream ss;
- ss << "Message " << m;
- if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ if (enq_msg(jc, create_msg(_msg, m)))
{
delete jc;
return 1;
}
- }
delete jc;
}
{
@@ -493,14 +467,14 @@
}
jc->recover();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
}
jc->recovered();
for (int m=0; m<NUM_MSGS; m++)
- if (read_msg(jc))
+ if (read_msg(m, jc))
{
delete jc;
return 1;
@@ -518,10 +492,148 @@
return 0;
}
- int enq_msg(rhm::journal::jcntl* jc, const char* msg, const size_t msg_size)
+ int test_15(int num_iter)
{
+ cout << " 15. complex recover: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_15", "jdata",
"t15");
+ jc.initialize();
+ // enqueue msgs, read then dequeue half
+ // rids 0-9
+ for (int m=0; m<NUM_MSGS*2; m++)
+ if (enq_msg(&jc, create_msg(_msg, m)))
+ return 1;
+ // rids 10-14 dequeueing rids 0-4
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_15", "jdata",
"t15");
+ jc.recover();
+ // recover other half of messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ jc.recovered();
+ // write more messages, read and dequeue all
+ // rids 15-19
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ if (enq_msg(&jc, create_msg(_msg, m)))
+ return 1;
+ jc.flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ if (read_msg(m, &jc))
+ return 1;
+ // rids 20-29 dequeueing rids 5-9, 15-19
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_16(int num_iter)
+ {
+ cout << " 16. complex recover ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_16", "jdata", "t16");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ // enqueue msgs, read then dequeue half
+ for (int m=0; m<NUM_MSGS*2; m++)
+ if (enq_msg(jc, create_msg(_msg, m)))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_16", "jdata", "t16");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ // recover other half of messages
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->recovered();
+ // write more messages, read and dequeue all
+ for (int m=NUM_MSGS*2; m<NUM_MSGS*3; m++)
+ if (enq_msg(jc, create_msg(_msg, m)))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->flush();
+ for (int m=NUM_MSGS; m<NUM_MSGS*3; m++)
+ if (read_msg(m, jc))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int enq_msg(rhm::journal::jcntl* jc, const std::string msg)
+ {
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- rhm::journal::iores res = jc->enqueue_data(msg, msg_size, dtp);
+ rhm::journal::iores res = jc->enqueue_data(msg.c_str(), msg.size(), dtp);
if (res != rhm::journal::RHM_IORES_SUCCESS)
{
cout << "jcntl::enqueue_data() returned " <<
iores_str[res] << ". failed" << endl;
@@ -546,11 +658,11 @@
return 0;
}
- int read_msg(rhm::journal::jcntl* jc)
+ int read_msg(int msg_num, rhm::journal::jcntl* jc)
{
- char buff[128];
- memset(buff, '?', 127);
- buff[127] = '\0';
+ char buff[MAX_MSG_SIZE + 1];
+ memset(buff, '?', MAX_MSG_SIZE);
+ buff[MAX_MSG_SIZE] = '\0';
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
dtp->set_wstate(rhm::journal::data_tok::ENQ);
unsigned aio_sleep_cnt = 0;
@@ -561,6 +673,7 @@
switch (res)
{
case rhm::journal::RHM_IORES_SUCCESS:
+ buff[dtp->dsize()] = '\0';
read = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -570,6 +683,7 @@
delete dtp;
return 1;
}
+ jc->get_wr_events();
::usleep(AIO_SLEEP_TIME);
break;
default:
@@ -578,8 +692,26 @@
return 1;
}
}
+ std::string s;
+ create_msg(s, msg_num);
+ if (s.compare(buff) != 0)
+ {
+ cout << "Message comparison failure: read \"" <<
buff << "\", expected \"" << s <<
+ "\"." << endl;
+ return 1;
+ }
return 0;
}
+
+ static std::string& create_msg(std::string& s, int msg_num)
+ {
+ std::stringstream ss;
+ ss << "Message_" << std::setfill('0') <<
std::setw(4) << msg_num;
+ ss << "_4567890123456789012345678901234567890";
+ ss << "12345678901234567890123456789012345678901234567890"; //
100 chars long (2 dblks)
+ s.assign(ss.str());
+ return s;
+ }
};
int main(int argc, char** argv)