Author: kpvdr
Date: 2009-07-31 11:06:34 -0400 (Fri, 31 Jul 2009)
New Revision: 3533
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Related to BZ514278 - "QPID process goes down when running TSXTEST with
--durable_msg": Changed timeout to 1-sec. with logged warnings every 2 sec. Also made
all qualifications to mrg::journal in namespace mrg::store fully qualified in
JournalImpl.h/.cpp
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-07-30 20:43:52 UTC (rev 3532)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-07-31 15:06:34 UTC (rev 3533)
@@ -134,7 +134,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- journal::aio_callback* const cbp)
+ mrg::journal::aio_callback* const cbp)
{
std::ostringstream oss;
oss << "Initialize; num_jfiles=" << num_jfiles << "
jfsize_sblks=" << jfsize_sblks;
@@ -166,7 +166,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- journal::aio_callback* const cbp,
+ mrg::journal::aio_callback* const cbp,
boost::ptr_list<msgstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
@@ -280,7 +280,7 @@
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient,
_external, &_dtok);
switch (res) {
- case journal::RHM_IORES_SUCCESS:
+ case mrg::journal::RHM_IORES_SUCCESS:
if (_dtok.rid() != rid) {
free_read_buffers();
// Reset data token for next read
@@ -294,22 +294,22 @@
done = true;
}
break;
- case journal::RHM_IORES_PAGE_AIOWAIT:
+ case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
get_wr_events();
usleep(AIO_SLEEP_TIME);
} else {
std::stringstream ss;
- ss << "read_data_record() returned " <<
journal::iores_str(res);
+ ss << "read_data_record() returned " <<
mrg::journal::iores_str(res);
ss << "; exceeded maximum wait time";
- throw jexception(journal::jerrno::JERR__TIMEOUT,
ss.str().c_str(), "JournalImpl",
+ throw jexception(mrg::journal::jerrno::JERR__TIMEOUT,
ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
break;
default:
std::stringstream ss;
- ss << "read_data_record() returned " <<
journal::iores_str(res);
- throw jexception(journal::jerrno::JERR__UNEXPRESPONSE,
ss.str().c_str(), "JournalImpl",
+ ss << "read_data_record() returned " <<
mrg::journal::iores_str(res);
+ throw jexception(mrg::journal::jerrno::JERR__UNEXPRESPONSE,
ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
}
@@ -318,7 +318,7 @@
ss << "read_data_record() was unable to find rid 0x" <<
std::hex << rid << std::dec;
ss << " (" << rid << "); last rid found was
0x" << std::hex << _dtok.rid() << std::dec;
ss << " (" << _dtok.rid() << ")";
- throw jexception(journal::jerrno::JERR__RECNFOUND, ss.str().c_str(),
"JournalImpl", "loadMsgContent");
+ throw jexception(mrg::journal::jerrno::JERR__RECNFOUND, ss.str().c_str(),
"JournalImpl", "loadMsgContent");
}
}
@@ -425,6 +425,30 @@
}
}
+#define MAX_INVALID_RETRYS 5 // This will yield a total wait time of 10 sec with 5 log
messages at 2 sec intervals
+iores
+JournalImpl::read_data_record(void** const data_buff, size_t& tot_data_len, void**
const xid_buff, size_t& xid_len,
+ bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
bool ignore_pending_txns)
+{
+ int retry_cnt = 0;
+ iores res;
+ do {
+ res = jcntl::read_data_record(data_buff, tot_data_len, xid_buff, xid_len,
transient, external, dtokp, ignore_pending_txns);
+ if (res == mrg::journal::RHM_IORES_RCINVALID) {
+ retry_cnt++;
+ std::ostringstream oss;
+ if (retry_cnt < MAX_INVALID_RETRYS) {
+ oss << "Store read pipeline on queue " << _jid
<< " timed out waiting for journal header file read, retrying...";
+ log(LOG_WARN, oss.str());
+ } else {
+ oss << "Store read pipeline on queue " << _jid
<< " timed out waiting for journal header file read, aborting read with
RHM_IORES_RCINVALID";
+ log(LOG_ERROR, oss.str());
+ }
+ }
+ } while (res == mrg::journal::RHM_IORES_RCINVALID && retry_cnt <
MAX_INVALID_RETRYS);
+ return res;
+}
+
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
@@ -468,13 +492,13 @@
}
void
-JournalImpl::log(journal::log_level ll, const std::string& log_stmt) const
+JournalImpl::log(mrg::journal::log_level ll, const std::string& log_stmt) const
{
log(ll, log_stmt.c_str());
}
void
-JournalImpl::log(journal::log_level ll, const char* const log_stmt) const
+JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const
{
switch (ll)
{
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2009-07-30 20:43:52 UTC (rev 3532)
+++ store/trunk/cpp/lib/JournalImpl.h 2009-07-31 15:06:34 UTC (rev 3533)
@@ -25,6 +25,7 @@
#define _JournalImpl_
#include <set>
+#include "jrnl/enums.hpp"
#include "jrnl/jcntl.hpp"
#include "jrnl/slock.hpp"
#include "DataTokenImpl.h"
@@ -68,7 +69,7 @@
inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0;
}
};
- class JournalImpl : public qpid::broker::ExternalQueueStore, public
journal::jcntl, public journal::aio_callback
+ class JournalImpl : public qpid::broker::ExternalQueueStore, public
mrg::journal::jcntl, public mrg::journal::aio_callback
{
private:
static qpid::broker::Timer* journalTimerPtr;
@@ -88,7 +89,7 @@
void* _xidp;
void* _datap;
size_t _dlen;
- journal::data_tok _dtok;
+ mrg::journal::data_tok _dtok;
bool _external;
qpid::management::ManagementAgent* _agent;
@@ -110,7 +111,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- journal::aio_callback* const cbp);
+ mrg::journal::aio_callback* const cbp);
inline void initialize(const u_int16_t num_jfiles,
const bool auto_expand,
@@ -128,7 +129,7 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- journal::aio_callback* const cbp,
+ mrg::journal::aio_callback* const cbp,
boost::ptr_list<msgstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id);
@@ -155,42 +156,46 @@
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
- const size_t this_data_len, journal::data_tok* dtokp,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp,
const bool transient = false);
- void enqueue_extern_data_record(const size_t tot_data_len, journal::data_tok*
dtokp,
+ void enqueue_extern_data_record(const size_t tot_data_len,
mrg::journal::data_tok* dtokp,
const bool transient = false);
void enqueue_txn_data_record(const void* const data_buff, const size_t
tot_data_len,
- const size_t this_data_len, journal::data_tok* dtokp, const
std::string& xid,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp, const
std::string& xid,
const bool transient = false);
- void enqueue_extern_txn_data_record(const size_t tot_data_len,
journal::data_tok* dtokp,
+ void enqueue_extern_txn_data_record(const size_t tot_data_len,
mrg::journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- void dequeue_data_record(journal::data_tok* const dtokp, const bool
txn_coml_commit = false);
+ void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool
txn_coml_commit = false);
- void dequeue_txn_data_record(journal::data_tok* const dtokp, const
std::string& xid, const bool txn_coml_commit = false);
+ void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const
std::string& xid, const bool txn_coml_commit = false);
- void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
+ mrg::journal::iores read_data_record(void** const data_buff, size_t&
tot_data_len, void** const xid_buff,
+ size_t& xid_len, bool& transient, bool& external,
mrg::journal::data_tok* const dtokp,
+ bool ignore_pending_txns = false);
- void txn_commit(journal::data_tok* const dtokp, const std::string& xid);
+ void txn_abort(mrg::journal::data_tok* const dtokp, const std::string&
xid);
+ void txn_commit(mrg::journal::data_tok* const dtokp, const std::string&
xid);
+
void stop(bool block_till_aio_cmpl = false);
// Logging
- void log(journal::log_level level, const std::string& log_stmt) const;
- void log(journal::log_level level, const char* const log_stmt) const;
+ void log(mrg::journal::log_level level, const std::string& log_stmt)
const;
+ void log(mrg::journal::log_level level, const char* const log_stmt) const;
// Overrides for get_events timer
- journal::iores flush(const bool block_till_aio_cmpl = false);
+ mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
// TimerTask callback
void getEventsFire();
void flushFire();
// AIO callbacks
- virtual void wr_aio_cb(std::vector<journal::data_tok*>& dtokl);
+ virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>&
dtokl);
virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
qpid::management::ManagementObject* GetManagementObject (void) const
@@ -209,7 +214,7 @@
journalTimerPtr->add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
- void handleIoResult(const journal::iores r);
+ void handleIoResult(const mrg::journal::iores r);
// Management instrumentation callbacks overridden from jcntl
inline void instr_incr_outstanding_aio_cnt() {
@@ -235,9 +240,9 @@
~TplJournalImpl() {}
// Special version of read_data_record that ignores transactions - needed
when reading the TPL
- inline journal::iores read_data_record(void** const datapp, std::size_t&
dsize,
+ inline mrg::journal::iores read_data_record(void** const datapp,
std::size_t& dsize,
void** const xidpp, std::size_t& xidsize, bool& transient,
bool& external,
- journal::data_tok* const dtokp) {
+ mrg::journal::data_tok* const dtokp) {
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize,
transient, external, dtokp, true);
}
inline void read_reset() { _rmgr.invalidate(); }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-07-30 20:43:52 UTC (rev 3532)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2009-07-31 15:06:34 UTC (rev 3533)
@@ -268,7 +268,8 @@
return _rmgr.discard(dtokp);
} */
-#define MAX_RCINVALID_CNT 500
+// These two combined make a wait time of approx. 2 sec.
+#define MAX_RCINVALID_CNT 400
#define RCINVALID_SLEEP_TIME_MS 5
iores
jcntl::read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
std::size_t& xidsize,