Author: kpvdr
Date: 2007-09-22 11:09:17 -0400 (Sat, 22 Sep 2007)
New Revision: 940
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/tests/Makefile.am
store/trunk/cpp/tests/jrnl/Makefile.rtest
Log:
Bugfix: replaced rrfc.is_wr_compl() with rrfc.is_wr_aio_outstanding() to fix logic error
causing premeture RHM_IORES_EMPTY exit on some reads
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -409,71 +409,70 @@
dtokp.set_wstate(rhm::journal::data_tok::ENQ);
// read the message from the Journal.
- while (read) {
+ try {
+ while (read) {
std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
- rhm::journal::iores res;
- try {
- res = jc->read_data(&buff, buffSize, &dtokp);
- } catch (rhm::journal::jexception& e) {
- std::cout << "recover read" << e << std::endl;
- std::string str;
- THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
- }
- readSize = dtokp.dsize();
- assert(readSize < buffSize); /// fail safe for hack...
+ rhm::journal::iores res = jc->read_data(&buff, buffSize, &dtokp);
+ readSize = dtokp.dsize();
+ assert(readSize < buffSize); /// fail safe for hack...
- switch (res)
- {
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- char* data = buff;
- unsigned headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read
size or header size ????
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ char* data = buff;
+ unsigned headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want
read size or header size ????
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
- msg->setPersistenceId(dtokp.rid());
+ RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ msg->setPersistenceId(dtokp.rid());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize)) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(),
dtokp.rid())) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
+ if (msg->loadContent(contentSize)) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+ if (PreparedTransaction::isLocked(locked,
queue->getPersistenceId(), dtokp.rid())) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
- if (dtokp.rid() > maxMessageId) {
- maxMessageId = dtokp.rid();
- }
+ if (dtokp.rid() > maxMessageId) {
+ maxMessageId = dtokp.rid();
+ }
- dtokp.reset();
- dtokp.set_wstate(rhm::journal::data_tok::ENQ);
- break;
- }
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- {
- THROW_STORE_EXCEPTION("Store error, disk time out on recover
for:" + queue->getName());
- }
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- read = false;
- // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
- assert (jc->get_enq_cnt() == msg_count);
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
- default:
- assert( "Store Error: Unexpected msg state");
- }
- }
+ dtokp.reset();
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ {
+ THROW_STORE_EXCEPTION("Store error, disk time out on recover
for:" + queue->getName());
+ }
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+ assert (jc->get_enq_cnt() == msg_count);
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
+ } // switch
+ } // while
+ } catch (rhm::journal::jexception& e) {
+ std::stringstream ss;
+ ss << e;
+ THROW_STORE_EXCEPTION("Error dequeuing message: " + ss.str());
+ }
messageIdSequence.reset(maxMessageId + 1);
}
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -371,7 +371,7 @@
nlfh::status_str(std::string& s) const
{
std::stringstream ss;
- ss << "nlfh[" << _fid << "]: ws=" <<
_wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
+ ss << "fid=" << _fid << " ws=" <<
_wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
ss << " rs=" << _rd_subm_cnt_dblks << " rc="
<< _rd_cmpl_cnt_dblks;
s.assign(ss.str());
return s;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -71,12 +71,11 @@
//std::cout << " rmgr::read()" << std::flush;
if (_aio_evt_rem)
get_events();
-//std::cout << " [a pi=" << _pg_index << " d="
<< dblks_rem() << " c=" <<
(_rrfc.is_compl()?"T":"F") << " wc=" <<
(_rrfc.is_wr_compl()?"T":"F") << "]" <<
std::flush;
-// if(dblks_rem() == 0 && _rrfc.is_full())
-// if(dblks_rem() == 0 && _rrfc.is_compl())
- if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
+//std::string s;
+//std::cout << " [a pi=" << _pg_index << " d="
<< dblks_rem() << " c=" <<
(_rrfc.is_compl()?"T":"F") << " wo=" <<
(_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:"
<< _rrfc.file_handle()->status_str(s) << "]" <<
std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
{
-// aio_cycle(); // check if any AIOs have returned
+ aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
}
//std::cout << " b" << std::flush;
@@ -110,10 +109,8 @@
// Read header, determine next record type
while (true)
{
-//std::cout << " [f pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << "]" << std::flush;
-// if(dblks_rem() == 0 && _rrfc.is_full())
-// if(dblks_rem() == 0 && _rrfc.is_compl())
- if(dblks_rem() == 0 && _rrfc.is_compl() && _rrfc.is_wr_compl())
+//std::cout << " [f pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << " status:" <<
_rrfc.file_handle()->status_str(s) << "]" << std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_compl() &&
!_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-22 15:09:17 UTC (rev 940)
@@ -124,7 +124,8 @@
inline const u_int32_t aio_outstanding_dblks()
{ return _curr_fh->rd_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
- inline const bool is_wr_compl() const { return _curr_fh->is_wr_compl(); }
+ inline const bool is_wr_aio_outstanding() const
+ { return _curr_fh->wr_aio_outstanding_dblks() > 0; }
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/tests/Makefile.am
===================================================================
--- store/trunk/cpp/tests/Makefile.am 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/tests/Makefile.am 2007-09-22 15:09:17 UTC (rev 940)
@@ -17,8 +17,8 @@
TransactionalTest
TESTS = \
- system_test.sh \
- run-unit-tests
+ run-unit-tests \
+ system_test.sh
TESTS_ENVIRONMENT = \
QPID_DIR=$(QPID_DIR) \
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-21 20:43:29 UTC (rev 939)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-22 15:09:17 UTC (rev 940)
@@ -73,7 +73,7 @@
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual
-Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers
-pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
-LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/qpid/cpp/src/.libs
+LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/redhat/qpid/cpp/src/.libs
.SUFFIXES:
.SUFFIXES: .cpp .o