rhmessaging commits: r3997 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2010-05-31 15:31:45 -0400 (Mon, 31 May 2010)
New Revision: 3997
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Fix valgrind errors caused by order of destruction issue.
Added a callback so that MessageStoreImpl is informed when JournalImpl
instances are deleted and can remove them from its map.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2010-05-31 19:31:45 UTC (rev 3997)
@@ -53,7 +53,8 @@
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* a):
+ qpid::management::ManagementAgent* a,
+ DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
lastReadRid(0),
@@ -65,7 +66,8 @@
_dtok(),
_external(false),
_agent(a),
- _mgmtObject(0)
+ _mgmtObject(0),
+ deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -108,6 +110,7 @@
JournalImpl::~JournalImpl()
{
+ if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
catch (const jexception& e) { log(LOG_ERROR, e.what()); }
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/JournalImpl.h 2010-05-31 19:31:45 UTC (rev 3997)
@@ -1,25 +1,25 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
- This file is part of the Qpid async store library msgstore.so.
+ This file is part of the Qpid async store library msgstore.so.
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
- */
+ The GNU Lesser General Public License is available in the file COPYING.
+*/
#ifndef _JournalImpl_
#define _JournalImpl_
@@ -38,219 +38,228 @@
#include "qmf/com/redhat/rhm/store/Journal.h"
namespace mrg {
- namespace msgstore {
+namespace msgstore {
- class JournalImpl;
+class JournalImpl;
- class InactivityFireEvent : public qpid::sys::TimerTask
- {
- JournalImpl* _parent;
- qpid::sys::Mutex _ife_lock;
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _ife_lock;
- public:
- InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), _parent(p) {}
- virtual ~InactivityFireEvent() {}
- void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
- };
+ public:
+ InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~InactivityFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
- class GetEventsFireEvent : public qpid::sys::TimerTask
- {
- JournalImpl* _parent;
- qpid::sys::Mutex _gefe_lock;
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _gefe_lock;
- public:
- GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), _parent(p) {}
- virtual ~GetEventsFireEvent() {}
- void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
- };
+ public:
+ GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~GetEventsFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
- class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
- {
- private:
- static qpid::sys::Mutex _static_lock;
- static qpid::sys::Timer* journalTimerPtr;
- static u_int32_t cnt;
+class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
+{
+ public:
+ typedef boost::function<void (JournalImpl&)> DeleteCallback;
+
+ private:
+ static qpid::sys::Mutex _static_lock;
+ static qpid::sys::Timer* journalTimerPtr;
+ static u_int32_t cnt;
- bool getEventsTimerSetFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- qpid::sys::Mutex _getf_lock;
+ bool getEventsTimerSetFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+ qpid::sys::Mutex _getf_lock;
- u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
- std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+ u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+ std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
- bool writeActivityFlag;
- bool flushTriggeredFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+ bool writeActivityFlag;
+ bool flushTriggeredFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
- // temp local vars for loadMsgContent below
- void* _xidp;
- void* _datap;
- size_t _dlen;
- mrg::journal::data_tok _dtok;
- bool _external;
+ // temp local vars for loadMsgContent below
+ void* _xidp;
+ void* _datap;
+ size_t _dlen;
+ mrg::journal::data_tok _dtok;
+ bool _external;
- qpid::management::ManagementAgent* _agent;
- qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+ qpid::management::ManagementAgent* _agent;
+ qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+ DeleteCallback deleteCallback;
+
+ public:
+
+ JournalImpl(const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent,
+ DeleteCallback deleteCallback=DeleteCallback() );
- public:
- JournalImpl(const std::string& journalId,
- const std::string& journalDirectory,
- const std::string& journalBaseFilename,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent);
+ virtual ~JournalImpl();
- virtual ~JournalImpl();
+ void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp);
- void initialize(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- mrg::journal::aio_callback* const cbp);
+ inline void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks) {
+ initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this);
+ }
- inline void initialize(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks) {
- initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
- this);
- }
+ void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id);
- void recover(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- mrg::journal::aio_callback* const cbp,
- boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id);
+ inline void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id) {
+ recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this, prep_tx_list_ptr, highest_rid, queue_id);
+ }
- inline void recover(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id) {
- recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
- this, prep_tx_list_ptr, highest_rid, queue_id);
- }
+ void recover_complete();
- void recover_complete();
+ // Temporary fn to read and save last msg read from journal so it can be assigned
+ // in chunks. To be replaced when coding to do this direct from the journal is ready.
+ // Returns true if the record is extern, false if local.
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
- // Temporary fn to read and save last msg read from journal so it can be assigned
- // in chunks. To be replaced when coding to do this direct from the journal is ready.
- // Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+ // Overrides for write inactivity timer
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
- // Overrides for write inactivity timer
- void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, mrg::journal::data_tok* dtokp,
- const bool transient = false);
+ void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
- void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
- const bool transient = false);
+ void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
+ const bool transient = false);
- void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
- const bool transient = false);
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const std::string& xid, const bool transient = false);
- void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
- const std::string& xid, const bool transient = false);
+ void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
- void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
+ void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
- void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+ mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
+ size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
+ bool ignore_pending_txns = false);
- mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
- size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
- bool ignore_pending_txns = false);
+ void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
- void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
+ void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
- void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
+ void stop(bool block_till_aio_cmpl = false);
- void stop(bool block_till_aio_cmpl = false);
+ // Logging
+ void log(mrg::journal::log_level level, const std::string& log_stmt) const;
+ void log(mrg::journal::log_level level, const char* const log_stmt) const;
- // Logging
- void log(mrg::journal::log_level level, const std::string& log_stmt) const;
- void log(mrg::journal::log_level level, const char* const log_stmt) const;
+ // Overrides for get_events timer
+ mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
- // Overrides for get_events timer
- mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
+ // TimerTask callback
+ void getEventsFire();
+ void flushFire();
- // TimerTask callback
- void getEventsFire();
- void flushFire();
+ // AIO callbacks
+ virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
+ virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
- // AIO callbacks
- virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
- virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+ qpid::management::ManagementObject* GetManagementObject (void) const
+ { return _mgmtObject; }
- qpid::management::ManagementObject* GetManagementObject (void) const
- { return _mgmtObject; }
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+ qpid::management::Args&,
+ std::string&);
- qpid::management::Manageable::status_t ManagementMethod (uint32_t,
- qpid::management::Args&,
- std::string&);
+ void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
- private:
- void free_read_buffers();
+ private:
+ void free_read_buffers();
- inline void setGetEventTimer()
- {
- assert(journalTimerPtr != 0);
- getEventsFireEventsPtr->setupNextFire();
- journalTimerPtr->add(getEventsFireEventsPtr);
- getEventsTimerSetFlag = true;
- }
- void handleIoResult(const mrg::journal::iores r);
+ inline void setGetEventTimer()
+ {
+ assert(journalTimerPtr != 0);
+ getEventsFireEventsPtr->setupNextFire();
+ journalTimerPtr->add(getEventsFireEventsPtr);
+ getEventsTimerSetFlag = true;
+ }
+ void handleIoResult(const mrg::journal::iores r);
- // Management instrumentation callbacks overridden from jcntl
- inline void instr_incr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
- }
- inline void instr_decr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
- }
- }; // class JournalImpl
+ // Management instrumentation callbacks overridden from jcntl
+ inline void instr_incr_outstanding_aio_cnt() {
+ if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+ }
+ inline void instr_decr_outstanding_aio_cnt() {
+ if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+ }
- class TplJournalImpl : public JournalImpl
- {
- public:
- TplJournalImpl(const std::string& journalId,
- const std::string& journalDirectory,
- const std::string& journalBaseFilename,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent) :
- JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
- {}
+}; // class JournalImpl
- ~TplJournalImpl() {}
+class TplJournalImpl : public JournalImpl
+{
+ public:
+ TplJournalImpl(const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent) :
+ JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+ {}
- // Special version of read_data_record that ignores transactions - needed when reading the TPL
- inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
- void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
- mrg::journal::data_tok* const dtokp) {
- return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
- }
- inline void read_reset() { _rmgr.invalidate(); }
- }; // class TplJournalImpl
+ ~TplJournalImpl() {}
- } // namespace msgstore
+ // Special version of read_data_record that ignores transactions - needed when reading the TPL
+ inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+ void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+ mrg::journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+ }
+ inline void read_reset() { _rmgr.invalidate(); }
+}; // class TplJournalImpl
+
+} // namespace msgstore
} // namespace mrg
#endif
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-31 19:31:45 UTC (rev 3997)
@@ -367,6 +367,7 @@
for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
JournalImpl* jQueue = i->second;
+ jQueue->resetDeleteCallback();
if (jQueue->is_ready()) jQueue->stop(true);
}
}
@@ -479,7 +480,8 @@
}
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"),
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
@@ -521,7 +523,7 @@
queue.setExternalQueueStore(0); // will delete the journal if exists
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
- journalList.erase(journalList.find(queue.getName()));
+ journalList.erase(queue.getName());
}
}
}
@@ -762,7 +764,8 @@
break;
}
jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
@@ -1644,6 +1647,11 @@
std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
+void MessageStoreImpl::journalDeleted(JournalImpl& j) {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ journalList.erase(j.id());
+}
+
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name),
numJrnlFiles(defNumJrnlFiles),
@@ -1668,7 +1676,7 @@
// "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
- ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+ ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
"Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
("truncate", qpid::optValue(truncateFlag, "yes|no"),
"If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
@@ -1687,3 +1695,4 @@
"Lower values decrease latency at the expense of throughput.")
;
}
+
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2010-05-31 19:31:45 UTC (rev 3997)
@@ -149,6 +149,7 @@
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::management::ManagementAgent* agent;
+
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
@@ -359,6 +360,10 @@
{ return qpid::management::Manageable::STATUS_OK; }
std::string getStoreDir() const;
+
+ private:
+ void journalDeleted(JournalImpl&);
+
}; // class MessageStoreImpl
} // namespace msgstore
14 years, 6 months
rhmessaging commits: r3996 - store/trunk/cpp/tests/cluster.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2010-05-31 10:08:28 -0400 (Mon, 31 May 2010)
New Revision: 3996
Modified:
store/trunk/cpp/tests/cluster/run_python_cluster_tests
Log:
Skip cluster_tests.ShortTests.test_sasl as it depends on a SASL database not available in the store build environment.
Modified: store/trunk/cpp/tests/cluster/run_python_cluster_tests
===================================================================
--- store/trunk/cpp/tests/cluster/run_python_cluster_tests 2010-05-27 18:06:48 UTC (rev 3995)
+++ store/trunk/cpp/tests/cluster/run_python_cluster_tests 2010-05-31 14:08:28 UTC (rev 3996)
@@ -28,8 +28,13 @@
echo "Running Python cluster tests..."
OUTDIR=brokertest.tmp
rm -rf $OUTDIR
-# Ignore tests requiring a store by default.
+
+# Ignore tests known to fail.
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:-"-I ${CLUSTER_TESTS_FAIL}"}
+# Ignore tests that don't work in the store environment
+# SASL test needs sasl test database which is not installed.
+CLUSTER_TESTS_IGNORE="${CLUSTER_TESTS_IGNORE} -i cluster_tests.ShortTests.test_sasl"
+
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
TEST_CMD="${QPID_PYTHON_TEST} -m cluster_tests ${CLUSTER_TESTS_IGNORE} ${CLUSTER_TESTS} -DOUTDIR=$OUTDIR"
14 years, 6 months
rhmessaging commits: r3995 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: aconway
Date: 2010-05-27 14:06:48 -0400 (Thu, 27 May 2010)
New Revision: 3995
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/StorePlugin.cpp
Log:
Bug 596765: Remove global shared_ptr to store in store plugin.
The global shared_ptr delays destruction of the store till after the broker is deleted causing core dumps when unregistering management objects.
https://bugzilla.redhat.com/show_bug.cgi?id=596765
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-26 20:46:43 UTC (rev 3994)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2010-05-27 18:06:48 UTC (rev 3995)
@@ -428,11 +428,11 @@
MessageStoreImpl::~MessageStoreImpl()
{
+ finalize();
try {
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
-// if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2010-05-26 20:46:43 UTC (rev 3994)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2010-05-27 18:06:48 UTC (rev 3995)
@@ -36,16 +36,15 @@
struct StorePlugin : public Plugin {
mrg::msgstore::MessageStoreImpl::StoreOptions options;
- boost::shared_ptr<qpid::broker::MessageStore> store;
Options* getOptions() { return &options; }
void earlyInitialize (Plugin::Target& target)
{
Broker* broker = dynamic_cast<Broker*>(&target);
- store.reset(new mrg::msgstore::MessageStoreImpl ());
+ if (!broker) return;
+ boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
DataDir& dataDir = broker->getDataDir ();
-
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
@@ -67,8 +66,7 @@
void finalize()
{
- MessageStore* sp = store.get();
- static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->finalize();
+ // This function intentionally left blank
}
const char* id() {return "StorePlugin";}
14 years, 7 months
rhmessaging commits: r3994 - in mgmt/newdata: rosemary/python/rosemary and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-26 16:46:43 -0400 (Wed, 26 May 2010)
New Revision: 3994
Modified:
mgmt/newdata/cumin/model/condor.xml
mgmt/newdata/rosemary/python/rosemary/model.py
Log:
Add group parsing to rosemary; fix up condor schema
Modified: mgmt/newdata/cumin/model/condor.xml
===================================================================
--- mgmt/newdata/cumin/model/condor.xml 2010-05-26 18:19:34 UTC (rev 3993)
+++ mgmt/newdata/cumin/model/condor.xml 2010-05-26 20:46:43 UTC (rev 3994)
@@ -26,7 +26,7 @@
type="sstr"
desc="The Condor version string for the daemon's version"/>
<property name="DaemonStartTime"
- type="absTime" unit="nanosecond"
+ type="absTime" unit="nanosecond" optional="y"
desc="Number of nanoseconds since epoch when the daemon
was started"/>
</group>
Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py 2010-05-26 18:19:34 UTC (rev 3993)
+++ mgmt/newdata/rosemary/python/rosemary/model.py 2010-05-26 20:46:43 UTC (rev 3994)
@@ -90,9 +90,15 @@
self.sql_schema = SqlSchema(self._model.sql_model, self._name)
def load(self, elem):
+ groups_by_name = dict()
+
+ for child in elem.findall("group"):
+ name = child.get("name")
+ groups_by_name[name] = child
+
for child in elem.findall("class"):
cls = RosemaryClass(self, child.get("name"))
- cls.load(child)
+ cls.load(child, groups_by_name)
def extend(self, elem):
for child in elem.findall("class"):
@@ -229,9 +235,22 @@
SqlUniqueConstraint(self.sql_table, name, cols)
- def load(self, elem):
+ def load(self, elem, groups_by_name):
log.debug("Loading %s", self)
+ for child in elem.findall("group"):
+ name = child.get("name")
+
+ try:
+ self.load_class_components(groups_by_name[name])
+ except KeyError:
+ log.error("Reference to group '%s' invalid", name)
+
+ raise
+
+ self.load_class_components(elem)
+
+ def load_class_components(self, elem):
for child in elem.findall("property"):
name = child.get("name")
14 years, 7 months
rhmessaging commits: r3993 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-05-26 14:19:34 -0400 (Wed, 26 May 2010)
New Revision: 3993
Modified:
mgmt/newdata/cumin/python/cumin/objectselector.py
Log:
Fixed problem with all checkboxes on a ObjectSelectionTable being checked
Modified: mgmt/newdata/cumin/python/cumin/objectselector.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectselector.py 2010-05-26 15:20:39 UTC (rev 3992)
+++ mgmt/newdata/cumin/python/cumin/objectselector.py 2010-05-26 18:19:34 UTC (rev 3993)
@@ -149,6 +149,11 @@
return "onclick=\"%s\"" % value
+ def render_checked_attr(self, session, record):
+ checks = self.get(session)
+ return record[self.parent.parent.field.index] in checks \
+ and "checked=\"checked\"" or ""
+
class ObjectLinkColumn(ObjectAttributeColumn, LinkColumn):
def __init__(self, app, name, attr, id_attr, frame_path):
super(ObjectLinkColumn, self).__init__(app, name, attr)
14 years, 7 months
rhmessaging commits: r3992 - mgmt/newdata/cumin/model.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-26 11:20:39 -0400 (Wed, 26 May 2010)
New Revision: 3992
Modified:
mgmt/newdata/cumin/model/rosemary.xml
Log:
Update to the new package name
Modified: mgmt/newdata/cumin/model/rosemary.xml
===================================================================
--- mgmt/newdata/cumin/model/rosemary.xml 2010-05-26 15:01:14 UTC (rev 3991)
+++ mgmt/newdata/cumin/model/rosemary.xml 2010-05-26 15:20:39 UTC (rev 3992)
@@ -240,7 +240,7 @@
</class>
</package>
- <package name="mrg.grid">
+ <package name="com.redhat.grid">
<class name="Slot">
<property name="JobId">
<title>Job ID</title>
14 years, 7 months
rhmessaging commits: r3991 - mgmt/newdata/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-05-26 11:01:14 -0400 (Wed, 26 May 2010)
New Revision: 3991
Modified:
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
mgmt/newdata/mint/python/mint/util.py
Log:
Handle objectProps calls with no properties set; add a class for profiling threads
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-05-25 19:54:21 UTC (rev 3990)
+++ mgmt/newdata/mint/python/mint/session.py 2010-05-26 15:01:14 UTC (rev 3991)
@@ -101,23 +101,33 @@
# because I don't get any agent info; instead
def objectProps(self, broker, obj):
- agent = self.model.get_agent(obj._agent)
+ agent = self.model.get_agent(obj.getAgent())
- if self.model.app.update_thread.isAlive():
- if obj.getTimestamps()[2]:
- up = ObjectDelete(self.model, agent, obj)
- else:
- up = ObjectUpdate(self.model, agent, obj)
+ if not self.model.app.update_thread.isAlive():
+ return
- self.model.app.update_thread.enqueue(up)
+ # XXX objectProps is getting called even if no properties are
+ # set
+ if not obj.getProperties():
+ return
+
+ if obj.getTimestamps()[2]:
+ up = ObjectDelete(self.model, agent, obj)
+ else:
+ up = ObjectUpdate(self.model, agent, obj)
+
+ self.model.app.update_thread.enqueue(up)
+
def objectStats(self, broker, obj):
- agent = self.model.get_agent(obj._agent)
+ agent = self.model.get_agent(obj.getAgent())
- if self.model.app.update_thread.isAlive():
- up = ObjectAddSample(self.model, agent, obj)
- self.model.app.update_thread.enqueue(up)
+ if not self.model.app.update_thread.isAlive():
+ return
+ up = ObjectAddSample(self.model, agent, obj)
+ self.model.app.update_thread.enqueue(up)
+
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-05-25 19:54:21 UTC (rev 3990)
+++ mgmt/newdata/mint/python/mint/update.py 2010-05-26 15:01:14 UTC (rev 3991)
@@ -408,8 +408,6 @@
self.agent = agent
def do_process(self, conn, stats):
- print "Ahoy!"
-
cursor = conn.cursor()
id = self.agent.id
@@ -419,7 +417,6 @@
for cls in pkg._classes:
for obj in cls.get_selection(cursor, _qmf_agent_id=id):
obj.delete(cursor)
- print "Bam!", obj
finally:
cursor.close()
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-05-25 19:54:21 UTC (rev 3990)
+++ mgmt/newdata/mint/python/mint/util.py 2010-05-26 15:01:14 UTC (rev 3991)
@@ -10,6 +10,7 @@
from getpass import getpass
from qmf.console import ObjectId
from random import sample
+from tempfile import mkstemp
from threading import Thread, Lock, RLock
from time import clock, sleep
from traceback import print_exc
@@ -180,3 +181,43 @@
def ess(num, ending="s"):
return num != 1 and ending or ""
+
+class ProfilerThread(object):
+ def calibrate(self, prof):
+ print "Calibrating"
+
+ biases = list()
+
+ for i in range(3):
+ bias = prof.calibrate(50000)
+ biases.append(bias)
+ print i, bias
+
+ prof.bias = sum(biases) / float(3)
+
+ print "Using bias %f" % prof.bias
+
+ def run(self):
+ from cProfile import Profile
+ from pstats import Stats
+
+ prof = Profile()
+
+ if hasattr(prof, "calibrate"):
+ self.calibrate(prof)
+
+ prof.runctx("self.do_run()",
+ globals=globals(),
+ locals=locals())
+
+ fd, path = mkstemp(".profile")
+
+ prof.dump_stats(path)
+
+ stats = Stats(path)
+
+ stats.sort_stats("cumulative").print_stats(15)
+ stats.sort_stats("time").print_stats(15)
+
+ def do_run(self):
+ raise Exception("Not implemented")
14 years, 7 months
rhmessaging commits: r3990 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-05-25 15:54:21 -0400 (Tue, 25 May 2010)
New Revision: 3990
Modified:
mgmt/newdata/cumin/python/cumin/objecttask.py
Log:
Use RosemaryObject's get_title() to get the object name for display in the ObjectTaskForm
Modified: mgmt/newdata/cumin/python/cumin/objecttask.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 18:58:45 UTC (rev 3989)
+++ mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 19:54:21 UTC (rev 3990)
@@ -88,26 +88,6 @@
def do_invoke(self, invoc, obj, *args):
pass
- def amqp_call(self, vhost, callback):
- db_conn = self.app.database.get_connection()
- cursor = db_conn.cursor()
-
- cls = self.app.model.org_apache_qpid_broker.Broker
- broker = cls.get_object_by_id(cursor, vhost._brokerRef_id)
- port = broker.port
-
- cls = self.app.model.org_apache_qpid_broker.System
- system = cls.get_object_by_id(cursor, broker._systemRef_id)
- host = system.nodeName
-
- sock = connect(host, port)
- conn = Connection(sock)
- conn.start()
- sess = conn.session()
-
- callback(sess)
- conn.close()
-
def qmf_call(self, invoc, obj, meth, *args):
def completion(status_code, output_args):
invoc.status_code = status_code
@@ -317,7 +297,7 @@
return super(ObjectTaskForm, self).render_content(session)
obj = self.object.get(session)
- return obj.name
+ return obj.get_title()
class SelectionTaskForm(TaskForm):
def __init__(self, app, name, task):
14 years, 7 months
rhmessaging commits: r3989 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-05-25 14:58:45 -0400 (Tue, 25 May 2010)
New Revision: 3989
Modified:
mgmt/newdata/cumin/python/cumin/objecttask.py
Log:
Display the object name or fields in the ObjectTaskForm
Modified: mgmt/newdata/cumin/python/cumin/objecttask.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 18:27:46 UTC (rev 3988)
+++ mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 18:58:45 UTC (rev 3989)
@@ -313,6 +313,9 @@
return self.task.get_title(session)
def render_content(self, session):
+ if len(self.main_fields.fields):
+ return super(ObjectTaskForm, self).render_content(session)
+
obj = self.object.get(session)
return obj.name
14 years, 7 months
rhmessaging commits: r3988 - mgmt/newdata/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-05-25 14:27:46 -0400 (Tue, 25 May 2010)
New Revision: 3988
Modified:
mgmt/newdata/cumin/python/cumin/objecttask.py
Log:
Display the object name in the ObjectTaskForm
Modified: mgmt/newdata/cumin/python/cumin/objecttask.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 16:44:22 UTC (rev 3987)
+++ mgmt/newdata/cumin/python/cumin/objecttask.py 2010-05-25 18:27:46 UTC (rev 3988)
@@ -312,6 +312,10 @@
def render_title(self, session):
return self.task.get_title(session)
+ def render_content(self, session):
+ obj = self.object.get(session)
+ return obj.name
+
class SelectionTaskForm(TaskForm):
def __init__(self, app, name, task):
super(SelectionTaskForm, self).__init__(app, name, task)
14 years, 7 months