[rhmessaging-commits] rhmessaging commits: r1498 - in store/trunk/cpp: tests and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Sun Dec 16 21:44:37 EST 2007
Author: cctrieloff
Date: 2007-12-16 21:44:36 -0500 (Sun, 16 Dec 2007)
New Revision: 1498
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/tests/run-unit-tests
Log:
- updated locking
- also stop() from JournalImpl
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-14 21:53:32 UTC (rev 1497)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-17 02:44:36 UTC (rev 1498)
@@ -51,6 +51,7 @@
writeActivityFlag(false),
flushTriggeredFlag(true),
aioWait(false),
+ aioWaitLock(false),
_xidp(0),
_datap(0),
_dlen(0),
@@ -65,6 +66,10 @@
JournalImpl::~JournalImpl()
{
+ if (_init_flag && !_stop_flag){
+ try { stop(true); }
+ catch (const jexception& e) { std::cerr << e << std::endl; }
+ }
(dynamic_cast<GetEventsFireEvent*>(getEventsFireEventsPtr.get()))->cancel();
(dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
if (_xidp) {
@@ -174,7 +179,7 @@
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
transient)));
}
@@ -183,7 +188,7 @@
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
const bool transient)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
}
@@ -191,7 +196,7 @@
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
dtokp, xid, transient)));
}
@@ -200,7 +205,7 @@
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
transient)));
}
@@ -208,28 +213,28 @@
void
JournalImpl::dequeue_data_record(data_tok* const dtokp)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
}
void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
}
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
}
void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ writeLockCheck();
while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
}
@@ -289,6 +294,17 @@
journalTimer.add(inactivityFireEventPtr);
}
+void
+JournalImpl::writeLockCheck()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ if (aioWait)
+ {
+ aioWaitLock = true;
+ writeLock.wait();
+ }
+}
+
const bool
JournalImpl::handleIoResult(const iores r)
{
@@ -296,10 +312,18 @@
switch (r)
{
case rhm::journal::RHM_IORES_SUCCESS:
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ aioWait = false;
+ }
return false;
case rhm::journal::RHM_IORES_AIO_WAIT:
- aioWait = true;
- writeLock.wait();
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ aioWait = true;
+ }
+ usleep(1000); // TODO: add counter here to limit the time spent in this loop?
+ get_wr_events();
return true;
case rhm::journal::RHM_IORES_ENQCAPTHRESH:
{
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-14 21:53:32 UTC (rev 1497)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-17 02:44:36 UTC (rev 1498)
@@ -77,6 +77,7 @@
qpid::sys::Monitor writeLock;
bool aioWait;
+ bool aioWaitLock; // monitor lock taken
// temp local vars for loadMsgContent below
void* _xidp;
@@ -153,9 +154,14 @@
void flushFire();
// Notify write monitor
- inline void notifyWriteMonitor() { if (aioWait) { aioWait = false; writeLock.notify(); } }
+ inline void notifyWriteMonitor()
+ {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ if (aioWaitLock) { aioWaitLock = false; writeLock.notify(); }
+ }
private:
+ void writeLockCheck();
const bool handleIoResult(const journal::iores r);
static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
Modified: store/trunk/cpp/tests/run-unit-tests
===================================================================
--- store/trunk/cpp/tests/run-unit-tests 2007-12-14 21:53:32 UTC (rev 1497)
+++ store/trunk/cpp/tests/run-unit-tests 2007-12-17 02:44:36 UTC (rev 1498)
@@ -3,12 +3,8 @@
. $srcdir/setup
fail=0
-
-DB_HOME=dbdata
-mkdir -p $DB_HOME
-export DB_HOME
+ulimit -c unlimited
LD_PRELOAD=$pwd/.libs/libdlclose_noop.so $vg DllPlugInTester -c -b $pwd/.libs/*Test.so 2> out || fail=1
-rm -rf $DB_HOME
vg_check out || fail=1
exit $fail
More information about the rhmessaging-commits
mailing list