[rhmessaging-commits] rhmessaging commits: r2801 - in store/trunk/cpp: lib/jrnl and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Nov 14 11:23:43 EST 2008
Author: kpvdr
Date: 2008-11-14 11:23:43 -0500 (Fri, 14 Nov 2008)
New Revision: 2801
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Fix for BZ471601 "Read tests may fail with RHM_IORES_EMPTY" and BZ471606 "TwoPhaseCommitTest fails intermittently with valgrind illegal read/write error"
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -25,7 +25,6 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
-#include "jrnl/slock.hpp"
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgent.h"
#include "qmf/com/redhat/rhm/store/ArgsJournalExpand.h"
@@ -40,12 +39,10 @@
qpid::broker::Timer* JournalImpl::journalTimerPtr = 0;
u_int32_t JournalImpl::cnt = 0;
-void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() {
- if (parent) parent->getEventsFire();
- release();
-}
+void InactivityFireEvent::fire() { slock s(&_ife_mutex); if (parent) parent->flushFire(); }
+void GetEventsFireEvent::fire() { slock s(&_gefe_mutex); if (parent) parent->getEventsFire(); }
+
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
@@ -453,9 +450,7 @@
{
slock s(&_getf_mutex);
getEventsTimerSetFlag = false;
- if (_wmgr.get_aio_evt_rem()) {
- jcntl::get_wr_events();
- }
+ if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-11-14 16:23:43 UTC (rev 2801)
@@ -26,6 +26,7 @@
#include <set>
#include "jrnl/jcntl.hpp"
+#include "jrnl/slock.hpp"
#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/Timer.h>
@@ -44,25 +45,27 @@
class InactivityFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
+ pthread_mutex_t _ife_mutex;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTask(timeout), parent(p) {}
- virtual ~InactivityFireEvent() {}
+ qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_ife_mutex, 0); }
+ virtual ~InactivityFireEvent() { ::pthread_mutex_destroy(&_ife_mutex); }
void fire();
- inline void cancel() { parent=0; }
+ inline void cancel() { mrg::journal::slock s(&_ife_mutex); parent = 0; }
};
class GetEventsFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
+ pthread_mutex_t _gefe_mutex;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTask(timeout), parent(p) {}
- virtual ~GetEventsFireEvent() {}
+ qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_gefe_mutex, 0); }
+ virtual ~GetEventsFireEvent() { ::pthread_mutex_destroy(&_gefe_mutex); }
void fire();
- inline void cancel() { parent=0; }
+ inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0; }
};
class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
@@ -73,7 +76,7 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
- pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+ pthread_mutex_t _getf_mutex;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent()
@@ -197,7 +200,6 @@
inline void setGetEventTimer()
{
- getEventsFireEventsPtr->addRef();
assert(journalTimerPtr != 0);
journalTimerPtr->add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
@@ -236,7 +238,7 @@
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
}
inline void read_reset() { _rmgr.invalidate(); }
- };
+ }; // class TplJournalImpl
} // namespace msgstore
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -110,6 +110,9 @@
* journal will no longer accept messages until either initialize() or recover() is called.
* There is no way other than through initialization to reset this flag.
*/
+ // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
+ // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
+ // this period, however, no new enqueue/dequeue/read requests may be accepted.
bool _stop_flag;
/**
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -132,10 +132,13 @@
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
- if (_jc->unflushed_dblks() > 0)
- _jc->flush();
- else if (!_aio_evt_rem)
- return RHM_IORES_EMPTY;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -359,10 +362,13 @@
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
- if (_jc->unflushed_dblks() > 0)
- _jc->flush();
- else if (!_aio_evt_rem)
- return RHM_IORES_EMPTY;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
}
// Check write state of this token is ENQ - required for read
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -107,21 +107,24 @@
test_jrnl jc(test_name, test_dir, test_name);
jc.initialize(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS);
- for (int m=0; m<NUM_MSGS*125; m++)
- enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
- jc.flush();
- for (int m=0; m<NUM_MSGS*125; m++)
+ for (int i=0; i<10; i++)
{
- read_msg(jc, rmsg, xid, transientFlag, externalFlag);
- BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
- BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
- BOOST_CHECK_EQUAL(transientFlag, false);
- BOOST_CHECK_EQUAL(externalFlag, false);
+ for (int m=0; m<NUM_MSGS*125; m++)
+ enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+ jc.flush();
+ for (int m=0; m<NUM_MSGS*125; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*125; m++)
+ deq_msg(jc, m, m+NUM_MSGS*125);
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
- read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
- for (int m=0; m<NUM_MSGS*125; m++)
- deq_msg(jc, m, m+NUM_MSGS*125);
- read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
@@ -140,7 +143,7 @@
test_jrnl jc(test_name, test_dir, test_name);
jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
- for (int m=0; m<2*NUM_MSGS; m+=2)
+ for (int m=0; m<500*NUM_MSGS; m+=2)
{
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
jc.flush();
More information about the rhmessaging-commits
mailing list