Author: kpvdr
Date: 2008-11-24 14:41:05 -0500 (Mon, 24 Nov 2008)
New Revision: 2875
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
Log:
Fix for BZ472215 "qpidd rmgr::get_events() threw JERR__AIO: AIO error". Also fix
for txtest failures where journal is forced to run in extern mode.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -246,8 +246,14 @@
{
// Free any previous msg
free_read_buffers();
+
+ // TODO: This is a brutal approach - very inefficient and slow. Rather intruduce
a system of remembering
+ // jumpover points and allow the read to jump back to the first known jumpover
point - but this needs
+ // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a
journal address - a
+ // combination of lid/offset.
if (rid < lastReadRid)
_rmgr.invalidate();
+
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -262,9 +268,9 @@
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient,
_external, &_dtok);
switch (res) {
case journal::RHM_IORES_SUCCESS:
- if (_dtok.rid() < rid) {
+ if (_dtok.rid() != rid) {
free_read_buffers();
- // reset data token for next read
+ // Reset data token for next read
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -283,19 +289,23 @@
std::stringstream ss;
ss << "read_data_record() returned " <<
journal::iores_str(res);
ss << "; exceeded maximum wait time";
- throw jexception(0, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
+ throw jexception(journal::jerrno::JERR__TIMEOUT,
ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
}
break;
default:
std::stringstream ss;
ss << "read_data_record() returned " <<
journal::iores_str(res);
- throw jexception(0, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
+ throw jexception(journal::jerrno::JERR__UNEXPRESPONSE,
ss.str().c_str(), "JournalImpl",
+ "loadMsgContent");
}
}
if (!rid_found) {
std::stringstream ss;
- ss << "read_data_record() was unable to find rid " <<
rid << "; last rid found was " << _dtok.rid();
- throw jexception(0, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
+ ss << "read_data_record() was unable to find rid 0x" <<
std::hex << rid << std::dec;
+ ss << " (" << rid << "); last rid found was
0x" << std::hex << _dtok.rid() << std::dec;
+ ss << " (" << _dtok.rid() << ")";
+ throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(),
"JournalImpl", "loadMsgContent");
}
}
if (_external)
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -1257,8 +1257,7 @@
}
}
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
- ": loadContent() failed: " + e.what());
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": loadContent() failed: " + e.what());
}
TxnCtxt txn;
txn.begin(env, true);
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -51,6 +51,9 @@
const u_int32_t jerrno::JERR__FILEIO = 0x0104;
const u_int32_t jerrno::JERR__RTCLOCK = 0x0105;
const u_int32_t jerrno::JERR__PTHREAD = 0x0106;
+const u_int32_t jerrno::JERR__TIMEOUT = 0x0107;
+const u_int32_t jerrno::JERR__UNEXPRESPONSE = 0x0108;
+const u_int32_t jerrno::JERR__RECNFOUND = 0x0109;
// class jcntl
const u_int32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -138,6 +141,9 @@
_err_map[JERR__FILEIO] = "JERR__FILEIO: File read or write failure.";
_err_map[JERR__RTCLOCK] = "JERR__RTCLOCK: Reading real-time clock
failed.";
_err_map[JERR__PTHREAD] = "JERR__PTHREAD: pthread failure.";
+ _err_map[JERR__TIMEOUT] = "JERR__TIMEOUT: Timeout waiting for event.";
+ _err_map[JERR__UNEXPRESPONSE] = "JERR__UNEXPRESPONSE: Unexpected response to
call or event.";
+ _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped
journal.";
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -69,6 +69,9 @@
static const u_int32_t JERR__FILEIO; ///< File read or write
failure
static const u_int32_t JERR__RTCLOCK; ///< Reading real-time clock
failed
static const u_int32_t JERR__PTHREAD; ///< pthread failure
+ static const u_int32_t JERR__TIMEOUT; ///< Timeout waiting for an
event
+ static const u_int32_t JERR__UNEXPRESPONSE; ///< Unexpected response to
call or event
+ static const u_int32_t JERR__RECNFOUND; ///< Record not found
// class jcntl
static const u_int32_t JERR_JCNTL_STOPPED; ///< Operation on stopped
journal
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -199,8 +199,7 @@
oss << "rid=0x" << std::setw(16)
<< _hdr._rid;
oss << "; dtok_rid=" << std::setw(16)
<< dtokp->rid();
oss << "; dtok_id=0x" << std::setw(8)
<< dtokp->id();
- throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(),
"rmgr",
- "read");
+ throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, oss.str(),
"rmgr", "read");
}
}
else
@@ -332,12 +331,38 @@
rmgr::invalidate()
{
if (_rrfc.is_valid())
+ _rrfc.set_invalid();
+}
+
+#define MAX_AIO_SLEEPS 1000 // 10 sec
+#define AIO_SLEEP_TIME 10000 // 10 ms
+void
+rmgr::init_validation()
+{
+ // Wait for any outstanding AIO read operations to complete before synchronizing
+ int aio_sleep_cnt = 0;
+ while (_aio_evt_rem)
{
- for (int i=0; i<_cache_num_pages; i++)
- _page_cb_arr[i]._state = UNUSED;
- _rrfc.unset_findex();
- _pg_offset_dblks = 0;
+ get_events();
+ if (_aio_evt_rem)
+ {
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS)
+ {
+ get_events();
+ usleep(AIO_SLEEP_TIME);
+ }
+ else
+ throw jexception(jerrno::JERR__TIMEOUT,
+ "Invalidate timed out waiting for outstanding read aio to
return", "rmgr", "invalidate");
+ }
}
+
+ // Reset all read states and pointers
+ for (int i=0; i<_cache_num_pages; i++)
+ _page_cb_arr[i]._state = UNUSED;
+ _rrfc.unset_findex();
+ _pg_index = 0;
+ _pg_offset_dblks = 0;
}
void
@@ -525,6 +550,7 @@
{
int16_t first_uninit = -1;
u_int16_t num_uninit = 0;
+ u_int16_t num_compl = 0;
bool outstanding = false;
// Index must start with current buffer and cycle around so that first
// uninitialized buffer is initialized first
@@ -543,12 +569,17 @@
case AIO_PENDING:
outstanding = true;
break;
+ case AIO_COMPLETE:
+ num_compl++;
+ break;
default:;
}
}
iores res = RHM_IORES_SUCCESS;
if (num_uninit)
res = init_aio_reads(first_uninit, num_uninit);
+ else if (num_compl == _cache_num_pages) // This condition exists after invalidation
+ res = init_aio_reads(0, _cache_num_pages);
if (outstanding)
get_events();
return res;
@@ -564,6 +595,7 @@
if (!_rrfc.is_valid())
{
+ init_validation();
_jc->get_earliest_fid(); // calls _rrfc.set_findex()
// If this file has not yet been written to, return RHM_IORES_EMPTY
if (_rrfc.is_void() && !_rrfc.is_wr_aio_outstanding())
@@ -584,6 +616,7 @@
// space into all contiguous empty pages in one AIO operation.
u_int32_t file_rem_dblks = _rrfc.remaining_dblks();
+ file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk
boundary
u_int32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks :
file_rem_dblks;
if (rd_size)
@@ -592,8 +625,7 @@
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
aio_cb* aiocbp = &_aio_cb_arr[pi];
- aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi],
- rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs());
+ aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size *
JRNL_DBLK_SIZE, _rrfc.subm_offs());
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "rmgr",
"init_aio_reads");
_rrfc.add_subm_cnt_dblks(rd_size);
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -92,6 +92,7 @@
private:
void initialize();
void clean();
+ void init_validation();
iores pre_read_check(data_tok* dtokp);
iores read_enq(rec_hdr& h, void* rptr, data_tok* dtokp);
void consume_xid_rec(rec_hdr& h, void* rptr, data_tok* dtokp);
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-11-24 19:40:44 UTC (rev 2874)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-11-24 19:41:05 UTC (rev 2875)
@@ -79,10 +79,7 @@
u_int16_t next_fc_index = _fc_index + 1;
if (next_fc_index == _lfmp->num_jfiles())
next_fc_index = 0;
- fcntl* next_fc = _lfmp->get_fcntlp(next_fc_index);
- _fc_index = next_fc_index;
- _curr_fc = next_fc;
- open_fh(_curr_fc->fname());
+ set_findex(next_fc_index);
return RHM_IORES_SUCCESS;
}