[rhmessaging-commits] rhmessaging commits: r1501 - in store/trunk/cpp/lib: jrnl and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Dec 17 11:21:15 EST 2007
Author: kpvdr
Date: 2007-12-17 11:21:15 -0500 (Mon, 17 Dec 2007)
New Revision: 1501
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/slock.hpp
Log:
Removed locks and monitors from class JournalImpl, returned to simpler approach of waiting for AIO to return by sleeping and periodically cheching for aio returns on the thread that had its write interrupted, while all other write threads remain locked by jcntl's write mutex.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-17 16:21:15 UTC (rev 1501)
@@ -50,8 +50,6 @@
getEventsTimerSetFlag(false),
writeActivityFlag(false),
flushTriggeredFlag(true),
- aioWait(false),
- aioWaitLock(false),
_xidp(0),
_datap(0),
_dlen(0),
@@ -179,63 +177,52 @@
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
- writeLockCheck();
- while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
- transient)));
+ handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
}
void
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
const bool transient)
{
- writeLockCheck();
- while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
+ handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
}
void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
- writeLockCheck();
- while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
- dtokp, xid, transient)));
+ handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
}
void
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
- writeLockCheck();
- while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
- transient)));
+ handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
}
void
JournalImpl::dequeue_data_record(data_tok* const dtokp)
{
- writeLockCheck();
- while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
+ handleIoResult(jcntl::dequeue_data_record(dtokp));
}
void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
- writeLockCheck();
- while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
}
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
- writeLockCheck();
- while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
+ handleIoResult(jcntl::txn_abort(dtokp, xid));
}
void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
- writeLockCheck();
- while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
+ handleIoResult(jcntl::txn_commit(dtokp, xid));
}
void
@@ -295,36 +282,13 @@
}
void
-JournalImpl::writeLockCheck()
-{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
- if (aioWait)
- {
- aioWaitLock = true;
- writeLock.wait();
- }
-}
-
-const bool
JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
switch (r)
{
case rhm::journal::RHM_IORES_SUCCESS:
- {
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
- aioWait = false;
- }
- return false;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- {
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
- aioWait = true;
- }
- usleep(1000); // TODO: add counter here to limit the time spent in this loop?
- get_wr_events();
- return true;
+ return;
case rhm::journal::RHM_IORES_ENQCAPTHRESH:
{
std::ostringstream oss;
@@ -382,7 +346,6 @@
dtokp->release();
this_dtok_list.pop_front();
}
- jip->notifyWriteMonitor();
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-17 16:21:15 UTC (rev 1501)
@@ -74,10 +74,6 @@
bool writeActivityFlag;
bool flushTriggeredFlag;
qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
-
- qpid::sys::Monitor writeLock;
- bool aioWait;
- bool aioWaitLock; // monitor lock taken
// temp local vars for loadMsgContent below
void* _xidp;
@@ -152,18 +148,10 @@
// TimerTask callback
void getEventsFire();
void flushFire();
-
- // Notify write monitor
- inline void notifyWriteMonitor()
- {
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
- if (aioWaitLock) { aioWaitLock = false; writeLock.notify(); }
- }
private:
- void writeLockCheck();
- const bool handleIoResult(const journal::iores r);
- static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+ void handleIoResult(const journal::iores r);
+ static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
}; // class JournalImpl
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-17 16:21:15 UTC (rev 1501)
@@ -42,6 +42,7 @@
#include <jrnl/file_hdr.hpp>
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
+#include <jrnl/slock.hpp>
#include <sstream>
#include <unistd.h>
@@ -207,16 +208,26 @@
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
check_wstatus("enqueue_data_record");
- slock s(&_mutex);
- return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient, false);
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL,
+ 0, transient, false), r));
+ return r;
+ }
}
const iores
jcntl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient)
{
check_wstatus("enqueue_extern_data_record");
- slock s(&_mutex);
- return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient,
+ true), r));
+ return r;
+ }
}
const iores
@@ -225,9 +236,13 @@
const bool transient)
{
check_wstatus("enqueue_tx_data_record");
- slock s(&_mutex);
- return _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false);
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp,
+ xid.data(), xid.size(), transient, false), r));
+ return r;
+ }
}
const iores
@@ -235,8 +250,13 @@
const std::string& xid, const bool transient)
{
check_wstatus("enqueue_extern_txn_data_record");
- slock s(&_mutex);
- return _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(),
+ transient, true), r));
+ return r;
+ }
}
const iores
@@ -266,32 +286,48 @@
jcntl::dequeue_data_record(data_tok* const dtokp)
{
check_wstatus("dequeue_data");
- slock s(&_mutex);
- return _wmgr.dequeue(dtokp, NULL, 0);
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, NULL, 0), r));
+ return r;
+ }
}
const iores
jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
check_wstatus("dequeue_data");
- slock s(&_mutex);
- return _wmgr.dequeue(dtokp, xid.data(), xid.size());
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r));
+ return r;
+ }
}
const iores
jcntl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
check_wstatus("txn_abort");
- slock s(&_mutex);
- return _wmgr.abort(dtokp, xid.data(), xid.size());
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r));
+ return r;
+ }
}
const iores
jcntl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
check_wstatus("txn_commit");
- slock s(&_mutex);
- return _wmgr.commit(dtokp, xid.data(), xid.size());
+ {
+ slock s(&_mutex);
+ iores r;
+ while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r));
+ return r;
+ }
}
const bool
@@ -338,8 +374,10 @@
return RHM_IORES_SUCCESS;
if (_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
- slock s(&_mutex);
- return _wmgr.flush();
+ {
+ slock s(&_mutex);
+ return _wmgr.flush();
+ }
}
// Private functions
@@ -394,6 +432,24 @@
}
}
+const bool
+jcntl::handle_aio_wait(const iores res, iores& resout)
+{
+ resout = res;
+ if (res == RHM_IORES_AIO_WAIT)
+ {
+ u_int32_t cnt = 0;
+ while (get_wr_events() == 0)
+ {
+ if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
+ usleep(AIO_CMPL_SLEEP);
+ }
+ return true;
+ }
+ return false;
+}
+
void
jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
{
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-17 16:21:15 UTC (rev 1501)
@@ -46,7 +46,6 @@
#include <jrnl/lfh.hpp>
#include <jrnl/rcvdat.hpp>
#include <jrnl/rmgr.hpp>
-#include <jrnl/slock.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
@@ -609,6 +608,12 @@
void aio_cmpl_wait();
/**
+ * \brief Call that blocks until at least one message returns; used to wait for
+ * AIO wait conditions to clear.
+ */
+ const bool handle_aio_wait(const iores res, iores& resout);
+
+ /**
* \brief Analyze journal for recovery.
*/
void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list);
Modified: store/trunk/cpp/lib/jrnl/slock.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-17 14:32:28 UTC (rev 1500)
+++ store/trunk/cpp/lib/jrnl/slock.hpp 2007-12-17 16:21:15 UTC (rev 1501)
@@ -3,8 +3,8 @@
*
* Red Hat Messaging - Message Journal
*
-* Messaging journal top-level control and interface class
-* rhm::journal::slock. See class documentation for details.
+* Messaging journal scoped lock class rhm::journal::slock and scoped try-clock
+* class rhm::journal::stlock.
*
* \author Kim van der Riet
*
More information about the rhmessaging-commits
mailing list