[rhmessaging-commits] rhmessaging commits: r1711 - in store/trunk/cpp: lib/jrnl and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Feb 20 10:58:08 EST 2008
Author: kpvdr
Date: 2008-02-20 10:58:08 -0500 (Wed, 20 Feb 2008)
New Revision: 1711
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
store/trunk/cpp/tests/jrnl/run-journal-tests
Log:
Bugfix for intermittent JMS failures; also added scoped locks to emap and tmap classes. Some python tool bugfixes also
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -48,8 +48,8 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
bool BdbMessageStore::useAsync;
-qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(1000000); // 1ms
-qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(1000000000); // 1 sec
+qpid::sys::Duration BdbMessageStore::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms
+qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
unsigned int TxnCtxt::count = 0;
qpid::sys::Mutex TxnCtxt::globalSerialiser;
@@ -274,9 +274,8 @@
{
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
- queue.setExternalQueueStore(NULL); // will delete the journal if exists
+ queue.setExternalQueueStore(0); // will delete the journal if exists
}
-
}
void BdbMessageStore::create(const PersistableExchange& exchange)
@@ -906,7 +905,9 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
if (!usingJrnl()) return;
+ if (queue.getExternalQueueStore() == 0) return;
checkInit();
+ std::string qn = queue.getName();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc){
@@ -914,7 +915,7 @@
/*rhm::journal::iores res =*/ jc->flush();
}
}catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
+ THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -25,6 +25,7 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "jrnl/slock.hpp"
#include "StoreException.h"
#include <qpid/sys/Monitor.h>
@@ -56,6 +57,7 @@
_dtok(),
_external(false)
{
+ ::pthread_mutex_init(&_getf_mutex, 0);
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
journalTimer.start();
@@ -77,6 +79,7 @@
::free(_datap);
_datap = 0;
}
+ ::pthread_mutex_destroy(&_getf_mutex);
}
void
@@ -232,13 +235,12 @@
}
const iores
-JournalImpl::flush()
+JournalImpl::flush(const bool block_till_aio_cmpl)
{
- const iores res = jcntl::flush();
- if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) {
- getEventsFireEventsPtr->addRef();
- journalTimer.add(getEventsFireEventsPtr);
- getEventsTimerSetFlag = true;
+ const iores res = jcntl::flush(block_till_aio_cmpl);
+ {
+ slock s(&_getf_mutex);
+ if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
return res;
}
@@ -246,22 +248,12 @@
void
JournalImpl::getEventsFire()
{
- if (_wmgr.get_aio_evt_rem()) {
- try { jcntl::get_wr_events(); }
- catch (const jexception& e)
- {
- // Another thread has already called get_wr_events() and it is still busy, ignore
- if (e.err_code() != jerrno::JERR__PTHREAD) {
- throw;
- }
- }
- }
+ slock s(&_getf_mutex);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) {
- getEventsFireEventsPtr->addRef();
- journalTimer.add(getEventsFireEventsPtr);
- getEventsTimerSetFlag = true;
+ jcntl::get_wr_events();
}
+ if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-02-20 15:58:08 UTC (rev 1711)
@@ -70,6 +70,7 @@
bool getEventsTimerSetFlag;
qpid::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
+ pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
bool writeActivityFlag;
bool flushTriggeredFlag;
@@ -135,13 +136,19 @@
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
- const journal::iores flush();
+ const journal::iores flush(const bool block_till_aio_cmpl = false);
// TimerTask callback
void getEventsFire();
void flushFire();
private:
+ inline void setGetEventTimer()
+ {
+ getEventsFireEventsPtr->addRef();
+ journalTimer.add(getEventsFireEventsPtr);
+ getEventsTimerSetFlag = true;
+ }
void handleIoResult(const journal::iores r);
static void aio_wr_callback(jcntl* journal, std::vector<journal::data_tok*>& dtokl);
// static void aio_rd_callback(jcntl* journal, std::vector<u_int16_t>& pil);
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -33,6 +33,7 @@
#include <iomanip>
#include <sstream>
#include <jrnl/jerrno.hpp>
+#include <jrnl/slock.hpp>
namespace rhm
@@ -60,10 +61,12 @@
void
enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked)
{
+ std::pair<emap_itr, bool> ret;
fid_lock_pair rec(fid, locked);
- pthread_mutex_lock(&_mutex);
- std::pair<emap_itr, bool> ret = _map.insert(emap_param(rid, rec));
- pthread_mutex_unlock(&_mutex);
+ {
+ slock s(&_mutex);
+ ret = _map.insert(emap_param(rid, rec));
+ }
if (ret.second == false)
{
std::ostringstream oss;
@@ -75,9 +78,11 @@
const u_int16_t
enq_map::get_fid(const u_int64_t rid)
{
- pthread_mutex_lock(&_mutex);
- emap_itr itr = _map.find(rid);
- pthread_mutex_unlock(&_mutex);
+ emap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(rid);
+ }
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -96,34 +101,33 @@
const u_int16_t
enq_map::get_remove_fid(const u_int64_t rid, const bool txn_flag)
{
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
}
if (itr->second.second && !txn_flag) // locked, but not a commit/abort
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "enq_map", "get_remove_fid");
}
u_int16_t fid = itr->second.first;
_map.erase(itr);
- pthread_mutex_unlock(&_mutex);
return fid;
}
const bool
enq_map::is_enqueued(const u_int64_t rid)
{
- pthread_mutex_lock(&_mutex);
- emap_itr itr = _map.find(rid);
- pthread_mutex_unlock(&_mutex);
+ emap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(rid);
+ }
if (itr == _map.end()) // not found in map
return false;
if (itr->second.second) // locked
@@ -134,41 +138,39 @@
void
enq_map::lock(const u_int64_t rid)
{
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
}
itr->second.second = true;
- pthread_mutex_unlock(&_mutex);
}
void
enq_map::unlock(const u_int64_t rid)
{
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
emap_itr itr = _map.find(rid);
if (itr == _map.end()) // not found in map
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "enq_map", "get_remove_fid");
}
itr->second.second = false;
- pthread_mutex_unlock(&_mutex);
}
const bool
enq_map::is_locked(const u_int64_t rid)
{
- pthread_mutex_lock(&_mutex);
- emap_itr itr = _map.find(rid);
- pthread_mutex_unlock(&_mutex);
+ emap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(rid);
+ }
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -182,20 +184,22 @@
enq_map::rid_list(std::vector<u_int64_t>& rv)
{
rv.clear();
- pthread_mutex_lock(&_mutex);
- for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
- rv.push_back(itr->first);
- pthread_mutex_unlock(&_mutex);
+ {
+ slock s(&_mutex);
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ rv.push_back(itr->first);
+ }
}
void
enq_map::fid_list(std::vector<u_int16_t>& fv)
{
fv.clear();
- pthread_mutex_lock(&_mutex);
- for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
- fv.push_back(itr->second.first);
- pthread_mutex_unlock(&_mutex);
+ {
+ slock s(&_mutex);
+ for (emap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ fv.push_back(itr->second.first);
+ }
}
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -34,6 +34,7 @@
#include <sstream>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
+#include <jrnl/slock.hpp>
namespace rhm
{
@@ -64,7 +65,7 @@
txn_map::insert_txn_data(const std::string& xid, const txn_data& td)
{
bool ok = true;
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
@@ -76,16 +77,17 @@
}
else
itr->second.push_back(td);
- pthread_mutex_unlock(&_mutex);
return ok;
}
const txn_data_list
txn_map::get_tdata_list(const std::string& xid)
{
- pthread_mutex_lock(&_mutex);
- xmap_itr itr = _map.find(xid);
- pthread_mutex_unlock(&_mutex);
+ xmap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(xid);
+ }
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -98,11 +100,10 @@
const txn_data_list
txn_map::get_remove_tdata_list(const std::string& xid)
{
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
@@ -110,16 +111,17 @@
}
txn_data_list list = itr->second;
_map.erase(itr);
- pthread_mutex_unlock(&_mutex);
return list;
}
const bool
txn_map::in_map(const std::string& xid)
{
- pthread_mutex_lock(&_mutex);
- xmap_itr itr = _map.find(xid);
- pthread_mutex_unlock(&_mutex);
+ xmap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(xid);
+ }
if (itr == _map.end()) // not found in map
return false;
return true;
@@ -128,9 +130,11 @@
const u_int32_t
txn_map::get_rid_count(const std::string& xid)
{
- pthread_mutex_lock(&_mutex);
- xmap_itr itr = _map.find(xid);
- pthread_mutex_unlock(&_mutex);
+ xmap_itr itr;
+ {
+ slock s(&_mutex);
+ itr = _map.find(xid);
+ }
if (itr == _map.end()) // not found in map
{
std::ostringstream oss;
@@ -143,11 +147,10 @@
const bool
txn_map::is_txn_synced(const std::string& xid)
{
- pthread_mutex_lock(&_mutex);
+ slock s(&_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
- pthread_mutex_unlock(&_mutex);
std::ostringstream oss;
oss << std::hex << "xid=" << xid_format(xid);
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
@@ -161,7 +164,6 @@
break;
}
}
- pthread_mutex_unlock(&_mutex);
return is_synced;
}
@@ -170,23 +172,24 @@
{
bool ok = true;
bool found = false;
- pthread_mutex_lock(&_mutex);
- xmap_itr itr = _map.find(xid);
- if (itr == _map.end()) // not found in map
- ok = false;
- else
{
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ slock s(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ ok = false;
+ else
{
- if (litr->_rid == rid)
+ for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
- found = true;
- litr->_aio_compl = true;
- break;
+ if (litr->_rid == rid)
+ {
+ found = true;
+ litr->_aio_compl = true;
+ break;
+ }
}
}
}
- pthread_mutex_unlock(&_mutex);
if (ok && !found)
{
std::ostringstream oss;
@@ -200,10 +203,11 @@
txn_map::xid_list(std::vector<std::string>& xv)
{
xv.clear();
- pthread_mutex_lock(&_mutex);
- for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
- xv.push_back(itr->first);
- pthread_mutex_unlock(&_mutex);
+ {
+ slock s(&_mutex);
+ for (xmap_itr itr = _map.begin(); itr != _map.end(); itr++)
+ xv.push_back(itr->first);
+ }
}
// static fn
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -37,6 +37,8 @@
#include <jrnl/jcntl.hpp>
#include <jrnl/jerrno.hpp>
+#include <iostream> // debug
+
namespace rhm
{
namespace journal
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2008-02-20 15:58:08 UTC (rev 1711)
@@ -210,7 +210,7 @@
f.read(rem_in_blk(f, dblk_size))
def check(self):
- if self.empty() or self.magic[-1] not in ['a', 'c', 'd', 'e', 'f', 'x']:
+ if self.empty() or self.magic[:3] != 'RHM' or self.magic[-1] not in ['a', 'c', 'd', 'e', 'f', 'x']:
return True
if self.ver != hdr_ver and self.magic[-1] != 'x':
raise Exception('%s: Invalid header version: found %d, expected %d.' % (self, self.ver, hdr_ver))
Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp 2008-02-20 15:58:08 UTC (rev 1711)
@@ -98,8 +98,13 @@
oss << path << "jfile_chk.py -q";
oss << " -d " << jpp->jdir();
oss << " -b " << jpp->base_filename();
- oss << " -c" << _args.test_case_csv_file_name;
- oss << " -t" << (*tci)->test_case_num();
+ // TODO: When jfile_check.py can handle previously recovered journals for
+ // specific tests, then remove this exclusion.
+ if (!_args.recover_mode)
+ {
+ oss << " -c" << _args.test_case_csv_file_name;
+ oss << " -t" << (*tci)->test_case_num();
+ }
bool res = system(oss.str().c_str()) == 0;
(*tci)->set_fmt_chk_res(res, jpp->jid());
}
Modified: store/trunk/cpp/tests/jrnl/run-journal-tests
===================================================================
--- store/trunk/cpp/tests/jrnl/run-journal-tests 2008-02-20 14:55:14 UTC (rev 1710)
+++ store/trunk/cpp/tests/jrnl/run-journal-tests 2008-02-20 15:58:08 UTC (rev 1711)
@@ -6,8 +6,38 @@
# Run jtt using default test set
echo
+echo "***** Mode 1: New journal instance, no recover, single journal *****"
+rm -rf /tmp/test_0*
$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk || fail = 1
echo
+#echo "***** Mode 2: New journal instance, no recover, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 3: Reuse journal instance, no recover, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk || fail = 1
+#echo
+#echo "***** Mode 4: Reuse journal instance, no recover, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 5: New journal instance, recover previous test journal, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk || fail = 1
+#echo
+#echo "***** Mode 6: New journal instance, recover previous test journal, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --recover-mode --format-chk --num-jrnls 5 || fail = 1
+#echo
+#echo "***** Mode 7: Reuse journal instance, recover previous test journal, single journal *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk || fail = 1
+#echo
+#echo "***** Mode 8: Reuse journal instance, recover previous test journal, multiple journals *****"
+#rm -rf /tmp/test_0*
+#$pwd/jtt/jtt --csv $pwd/jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls 5 || fail = 1
+#echo
# Run cpp-unit tests
LD_PRELOAD=$pwd/.libs/libdlclose_noop.so DllPlugInTester -c -b $pwd/.libs/Journal*Tests.so || fail=1
More information about the rhmessaging-commits
mailing list