rhmessaging commits: r4491 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2012-01-25 14:35:43 -0500 (Wed, 25 Jan 2012)
New Revision: 4491
Modified:
store/trunk/cpp/lib/jrnl/jdir.cpp
Log:
Minor fix to jdir: now able to handle symbolic links.
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2012-01-06 20:41:19 UTC (rev 4490)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2012-01-25 19:35:43 UTC (rev 4491)
@@ -284,15 +284,13 @@
if (std::strcmp(entry->d_name, ".") != 0 && std::strcmp(entry->d_name, "..") != 0)
{
std::string full_name(dirname + "/" + entry->d_name);
- if (::stat(full_name.c_str(), &s))
+ if (::lstat(full_name.c_str(), &s))
{
::closedir(dir);
std::ostringstream oss;
oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
}
- // FIXME: This fn does not handle symbolic links correctly and throws up
- // For some reason, S_ISLNK() fails to identify links correctly.
if (S_ISREG(s.st_mode) || S_ISLNK(s.st_mode)) // This is a file or slink
{
if(::unlink(full_name.c_str()))
@@ -412,7 +410,7 @@
// Throw for any other condition
std::ostringstream oss;
oss << "file=\"" << name << "\"" << FORMAT_SYSERR(errno);
- throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "is_dir");
+ throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "exists");
}
return true;
}
12 years, 11 months
rhmessaging commits: r4490 - store/trunk/cpp/tools/qpidstore.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2012-01-06 15:41:19 -0500 (Fri, 06 Jan 2012)
New Revision: 4490
Modified:
store/trunk/cpp/tools/qpidstore/janal.py
Log:
BZ 772326 - Store tool store_chk fails with "ValueError: too many values to unpack" when transactional abort encountered in journal
Modified: store/trunk/cpp/tools/qpidstore/janal.py
===================================================================
--- store/trunk/cpp/tools/qpidstore/janal.py 2012-01-04 19:28:32 UTC (rev 4489)
+++ store/trunk/cpp/tools/qpidstore/janal.py 2012-01-06 20:41:19 UTC (rev 4490)
@@ -219,9 +219,9 @@
def _abort(self, xid):
"""Perform an abort operation for the given xid record"""
- for fid, hdr in self.__map[xid]:
+ for fid, hdr, lock in self.__map[xid]:
if isinstance(hdr, jrnl.DeqRec):
- self.__emap.unlock(hdr.rid)
+ self.__emap.unlock(hdr.deq_rid)
del self.__map[xid]
def _commit(self, xid):
12 years, 11 months
rhmessaging commits: r4489 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2012-01-04 14:28:32 -0500 (Wed, 04 Jan 2012)
New Revision: 4489
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
Log:
BZ 769828: Added a high-level lock on the JournalImpl::loadMessage() method, also reworked the logic of the read invalidation. Tests show that this works correctly under multi-threaded conditions, but is VERY SLOW. This was not designed with multiple read contexts in mind, so there is a lot of thrashing and re-reading, but this should be thread-safe now. Ultimately the flow-to-disk facility which uses this "feature" needs a re-design.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2012-01-04 17:00:47 UTC (rev 4488)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2012-01-04 19:28:32 UTC (rev 4489)
@@ -257,6 +257,7 @@
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
{
+ qpid::sys::Mutex::ScopedLock sl(_read_lock);
if (_dtok.rid() != rid)
{
// Free any previous msg
@@ -274,8 +275,11 @@
// jumpover points and allow the read to jump back to the first known jumpover point - but this needs
// a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
// combination of lid/offset.
- if (oooFlag || rid < lastReadRid)
+ // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
+ if (oooFlag || rid < lastReadRid) {
_rmgr.invalidate();
+ oooRidList.clear();
+ }
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
@@ -285,7 +289,6 @@
bool transient = false;
bool done = false;
bool rid_found = false;
- oooRidList.clear();
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
switch (res) {
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2012-01-04 17:00:47 UTC (rev 4488)
+++ store/trunk/cpp/lib/JournalImpl.h 2012-01-04 19:28:32 UTC (rev 4489)
@@ -83,6 +83,7 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
+ qpid::sys::Mutex _read_lock;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
12 years, 11 months
rhmessaging commits: r4488 - in store/trunk/cpp: tests/federation and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2012-01-04 12:00:47 -0500 (Wed, 04 Jan 2012)
New Revision: 4488
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/tests/federation/federation_tests_env.sh
Log:
Fix python path issue for python tests. Other minor fixes and tidy-ups
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2011-12-16 15:29:51 UTC (rev 4487)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2012-01-04 17:00:47 UTC (rev 4488)
@@ -705,12 +705,13 @@
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ const PreparedTransaction pt = *i;
if (mgmtObject != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
- std::string xid = i->xid;
+ std::string xid = pt.xid;
// Restore data token state in TxnCtxt
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
@@ -729,14 +730,14 @@
qpid::broker::RecoverableTransaction::shared_ptr dtx;
if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
- if (i->enqueues.get()) {
- for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ if (pt.enqueues.get()) {
+ for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
- if (i->dequeues.get()) {
- for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ if (pt.dequeues.get()) {
+ for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
@@ -751,13 +752,13 @@
opcc->recoverDtok(citr->second.rid, xid);
opcc->prepare(tplStorePtr.get());
- if (i->enqueues.get()) {
- for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+ if (pt.enqueues.get()) {
+ for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
- if (i->dequeues.get()) {
- for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+ if (pt.dequeues.get()) {
+ for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2011-12-16 15:29:51 UTC (rev 4487)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2012-01-04 17:00:47 UTC (rev 4488)
@@ -334,8 +334,8 @@
void loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data,
- u_int64_t offset,
- u_int32_t length);
+ uint64_t offset,
+ uint32_t length);
void enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
Modified: store/trunk/cpp/tests/federation/federation_tests_env.sh
===================================================================
--- store/trunk/cpp/tests/federation/federation_tests_env.sh 2011-12-16 15:29:51 UTC (rev 4487)
+++ store/trunk/cpp/tests/federation/federation_tests_env.sh 2012-01-04 17:00:47 UTC (rev 4488)
@@ -114,6 +114,20 @@
return 0
}
+func_set_python_env()
+#--------------------
+# Set up the python path
+# Params: None
+# Returns: Nothing
+{
+ if test "${QPID_DIR}" -a -d "${QPID_DIR}" ; then
+ QPID_PYTHON=${QPID_DIR}/python
+ QPID_TOOLS=${QPID_DIR}/tools/src/py
+ QMF_LIB=${QPID_DIR}/extras/qmf/src/py
+ export PYTHONPATH=${QPID_PYTHON}:${QMF_LIB}:${QPID_TOOLS}:$PYTHONPATH
+ fi
+}
+
func_set_env ()
#--------------
# Set up the environment based on value of ${QPID_DIR}: if ${QPID_DIR} exists, assume a svn checkout,
@@ -277,6 +291,7 @@
#--- Start of script ---
+func_set_python_env
func_check_required_env || exit 1 # Cannot run, exit with error
func_check_qpid_python || exit 1 # Cannot run, exit with error
func_check_clustering # Warning
12 years, 11 months