[rhmessaging-commits] rhmessaging commits: r3997 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon May 31 15:31:46 EDT 2010


Author: aconway
Date: 2010-05-31 15:31:45 -0400 (Mon, 31 May 2010)
New Revision: 3997

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Fix valgrind errors caused by order of destruction issue.

Added a callback so that MessageStoreImpl is informed when JournalImpl
instances are deleted and can remove them from its map.


Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2010-05-31 19:31:45 UTC (rev 3997)
@@ -53,7 +53,8 @@
                          const std::string& journalBaseFilename,
                          const qpid::sys::Duration getEventsTimeout,
                          const qpid::sys::Duration flushTimeout,
-                         qpid::management::ManagementAgent* a):
+                         qpid::management::ManagementAgent* a,
+                         DeleteCallback onDelete):
                          jcntl(journalId, journalDirectory, journalBaseFilename),
                          getEventsTimerSetFlag(false),
                          lastReadRid(0),
@@ -65,7 +66,8 @@
                          _dtok(),
                          _external(false),
                          _agent(a),
-                         _mgmtObject(0)
+                         _mgmtObject(0),
+                         deleteCallback(onDelete)
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
     inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -108,6 +110,7 @@
 
 JournalImpl::~JournalImpl()
 {
+    if (deleteCallback) deleteCallback(*this);
     if (_init_flag && !_stop_flag){
     	try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
         catch (const jexception& e) { log(LOG_ERROR, e.what()); }

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/JournalImpl.h	2010-05-31 19:31:45 UTC (rev 3997)
@@ -1,25 +1,25 @@
 /*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+  Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
 
- This file is part of the Qpid async store library msgstore.so.
+  This file is part of the Qpid async store library msgstore.so.
 
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
 
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- Lesser General Public License for more details.
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
 
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
- USA
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
+  USA
 
- The GNU Lesser General Public License is available in the file COPYING.
- */
+  The GNU Lesser General Public License is available in the file COPYING.
+*/
 
 #ifndef _JournalImpl_
 #define _JournalImpl_
@@ -38,219 +38,228 @@
 #include "qmf/com/redhat/rhm/store/Journal.h"
 
 namespace mrg {
-    namespace msgstore {
+namespace msgstore {
 
-        class JournalImpl;
+class JournalImpl;
 
-        class InactivityFireEvent : public qpid::sys::TimerTask
-        {
-            JournalImpl* _parent;
-            qpid::sys::Mutex _ife_lock;
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _ife_lock;
 
-        public:
-	        InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), _parent(p) {}
-            virtual ~InactivityFireEvent() {}
-            void fire();
-            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
-        };
+  public:
+    InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+        qpid::sys::TimerTask(timeout), _parent(p) {}
+    virtual ~InactivityFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
 
-        class GetEventsFireEvent : public qpid::sys::TimerTask
-        {
-            JournalImpl* _parent;
-            qpid::sys::Mutex _gefe_lock;
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+    JournalImpl* _parent;
+    qpid::sys::Mutex _gefe_lock;
 
-        public:
-	        GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
-                qpid::sys::TimerTask(timeout), _parent(p) {}
-            virtual ~GetEventsFireEvent() {}
-            void fire();
-            inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
-        };
+  public:
+    GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+        qpid::sys::TimerTask(timeout), _parent(p) {}
+    virtual ~GetEventsFireEvent() {}
+    void fire();
+    inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
 
-        class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
-        {
-        private:
-            static qpid::sys::Mutex _static_lock;
-            static qpid::sys::Timer* journalTimerPtr;
-            static u_int32_t cnt;
+class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
+{
+  public:
+    typedef boost::function<void (JournalImpl&)> DeleteCallback;
+    
+  private:
+    static qpid::sys::Mutex _static_lock;
+    static qpid::sys::Timer* journalTimerPtr;
+    static u_int32_t cnt;
 
-            bool getEventsTimerSetFlag;
-            boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
-            qpid::sys::Mutex _getf_lock;
+    bool getEventsTimerSetFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+    qpid::sys::Mutex _getf_lock;
 
-            u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
-            std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+    u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+    std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
 
-            bool writeActivityFlag;
-            bool flushTriggeredFlag;
-            boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+    bool writeActivityFlag;
+    bool flushTriggeredFlag;
+    boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
 
-            // temp local vars for loadMsgContent below
-            void* _xidp;
-            void* _datap;
-            size_t _dlen;
-            mrg::journal::data_tok _dtok;
-            bool _external;
+    // temp local vars for loadMsgContent below
+    void* _xidp;
+    void* _datap;
+    size_t _dlen;
+    mrg::journal::data_tok _dtok;
+    bool _external;
 
-            qpid::management::ManagementAgent* _agent;
-            qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+    qpid::management::ManagementAgent* _agent;
+    qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+    DeleteCallback deleteCallback;
+    
+  public:
+    
+    JournalImpl(const std::string& journalId,
+                const std::string& journalDirectory,
+                const std::string& journalBaseFilename,
+                const qpid::sys::Duration getEventsTimeout,
+                const qpid::sys::Duration flushTimeout,
+                qpid::management::ManagementAgent* agent,
+                DeleteCallback deleteCallback=DeleteCallback() );
 
-        public:
-            JournalImpl(const std::string& journalId,
-                        const std::string& journalDirectory,
-                        const std::string& journalBaseFilename,
-                        const qpid::sys::Duration getEventsTimeout,
-                        const qpid::sys::Duration flushTimeout,
-                        qpid::management::ManagementAgent* agent);
+    virtual ~JournalImpl();
 
-            virtual ~JournalImpl();
+    void initialize(const u_int16_t num_jfiles,
+                    const bool auto_expand,
+                    const u_int16_t ae_max_jfiles,
+                    const u_int32_t jfsize_sblks,
+                    const u_int16_t wcache_num_pages,
+                    const u_int32_t wcache_pgsize_sblks,
+                    mrg::journal::aio_callback* const cbp);
 
-            void initialize(const u_int16_t num_jfiles,
-                            const bool auto_expand,
-                            const u_int16_t ae_max_jfiles,
-                            const u_int32_t jfsize_sblks,
-                            const u_int16_t wcache_num_pages,
-                            const u_int32_t wcache_pgsize_sblks,
-                            mrg::journal::aio_callback* const cbp);
+    inline void initialize(const u_int16_t num_jfiles,
+                           const bool auto_expand,
+                           const u_int16_t ae_max_jfiles,
+                           const u_int32_t jfsize_sblks,
+                           const u_int16_t wcache_num_pages,
+                           const u_int32_t wcache_pgsize_sblks) {
+        initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+                   this);
+    }
 
-            inline void initialize(const u_int16_t num_jfiles,
-                                   const bool auto_expand,
-                                   const u_int16_t ae_max_jfiles,
-                                   const u_int32_t jfsize_sblks,
-                                   const u_int16_t wcache_num_pages,
-                                   const u_int32_t wcache_pgsize_sblks) {
-                initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
-                        this);
-            }
+    void recover(const u_int16_t num_jfiles,
+                 const bool auto_expand,
+                 const u_int16_t ae_max_jfiles,
+                 const u_int32_t jfsize_sblks,
+                 const u_int16_t wcache_num_pages,
+                 const u_int32_t wcache_pgsize_sblks,
+                 mrg::journal::aio_callback* const cbp,
+                 boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+                 u_int64_t& highest_rid,
+                 u_int64_t queue_id);
 
-            void recover(const u_int16_t num_jfiles,
-                         const bool auto_expand,
-                         const u_int16_t ae_max_jfiles,
-                         const u_int32_t jfsize_sblks,
-                         const u_int16_t wcache_num_pages,
-                         const u_int32_t wcache_pgsize_sblks,
-                         mrg::journal::aio_callback* const cbp,
-                         boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
-                         u_int64_t& highest_rid,
-                         u_int64_t queue_id);
+    inline void recover(const u_int16_t num_jfiles,
+                        const bool auto_expand,
+                        const u_int16_t ae_max_jfiles,
+                        const u_int32_t jfsize_sblks,
+                        const u_int16_t wcache_num_pages,
+                        const u_int32_t wcache_pgsize_sblks,
+                        boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+                        u_int64_t& highest_rid,
+                        u_int64_t queue_id) {
+        recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+                this, prep_tx_list_ptr, highest_rid, queue_id);
+    }
 
-            inline void recover(const u_int16_t num_jfiles,
-                                const bool auto_expand,
-                                const u_int16_t ae_max_jfiles,
-                                const u_int32_t jfsize_sblks,
-                                const u_int16_t wcache_num_pages,
-                                const u_int32_t wcache_pgsize_sblks,
-                                boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
-                                u_int64_t& highest_rid,
-                                u_int64_t queue_id) {
-                recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
-                        this, prep_tx_list_ptr, highest_rid, queue_id);
-            }
+    void recover_complete();
 
-            void recover_complete();
+    // Temporary fn to read and save last msg read from journal so it can be assigned
+    // in chunks. To be replaced when coding to do this direct from the journal is ready.
+    // Returns true if the record is extern, false if local.
+    bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
 
-            // Temporary fn to read and save last msg read from journal so it can be assigned
-            // in chunks. To be replaced when coding to do this direct from the journal is ready.
-            // Returns true if the record is extern, false if local.
-            bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+    // Overrides for write inactivity timer
+    void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+                             const size_t this_data_len, mrg::journal::data_tok* dtokp,
+                             const bool transient = false);
 
-            // Overrides for write inactivity timer
-            void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
-                    const size_t this_data_len, mrg::journal::data_tok* dtokp,
-                    const bool transient = false);
+    void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+                                    const bool transient = false);
 
-            void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
-                    const bool transient = false);
+    void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+                                 const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
+                                 const bool transient = false);
 
-            void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
-                    const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
-                    const bool transient = false);
+    void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+                                        const std::string& xid, const bool transient = false);
 
-            void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
-                    const std::string& xid, const bool transient = false);
+    void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
 
-            void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
+    void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
 
-            void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+    mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
+                                         size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
+                                         bool ignore_pending_txns = false);
 
-            mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
-                    size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
-                    bool ignore_pending_txns = false);
+    void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
 
-            void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
+    void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
 
-            void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
+    void stop(bool block_till_aio_cmpl = false);
 
-            void stop(bool block_till_aio_cmpl = false);
+    // Logging
+    void log(mrg::journal::log_level level, const std::string& log_stmt) const;
+    void log(mrg::journal::log_level level, const char* const log_stmt) const;
 
-            // Logging
-            void log(mrg::journal::log_level level, const std::string& log_stmt) const;
-            void log(mrg::journal::log_level level, const char* const log_stmt) const;
+    // Overrides for get_events timer
+    mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
 
-            // Overrides for get_events timer
-            mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
+    // TimerTask callback
+    void getEventsFire();
+    void flushFire();
 
-            // TimerTask callback
-            void getEventsFire();
-            void flushFire();
+    // AIO callbacks
+    virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
+    virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
 
-            // AIO callbacks
-            virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
-            virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+    qpid::management::ManagementObject* GetManagementObject (void) const
+    { return _mgmtObject; }
 
-            qpid::management::ManagementObject* GetManagementObject (void) const
-                { return _mgmtObject; }
+    qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+                                                             qpid::management::Args&,
+                                                             std::string&);
 
-            qpid::management::Manageable::status_t ManagementMethod (uint32_t,
-                                                                     qpid::management::Args&,
-                                                                     std::string&);
+    void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
 
-        private:
-            void free_read_buffers();
+  private:
+    void free_read_buffers();
 
-            inline void setGetEventTimer()
-            {
-                assert(journalTimerPtr != 0);
-                getEventsFireEventsPtr->setupNextFire();
-                journalTimerPtr->add(getEventsFireEventsPtr);
-                getEventsTimerSetFlag = true;
-            }
-            void handleIoResult(const mrg::journal::iores r);
+    inline void setGetEventTimer()
+    {
+        assert(journalTimerPtr != 0);
+        getEventsFireEventsPtr->setupNextFire();
+        journalTimerPtr->add(getEventsFireEventsPtr);
+        getEventsTimerSetFlag = true;
+    }
+    void handleIoResult(const mrg::journal::iores r);
 
-            // Management instrumentation callbacks overridden from jcntl
-            inline void instr_incr_outstanding_aio_cnt() {
-                if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
-            }
-            inline void instr_decr_outstanding_aio_cnt() {
-                if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
-            }
-        }; // class JournalImpl
+    // Management instrumentation callbacks overridden from jcntl
+    inline void instr_incr_outstanding_aio_cnt() {
+        if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+    }
+    inline void instr_decr_outstanding_aio_cnt() {
+        if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+    }
 
-        class TplJournalImpl : public JournalImpl
-        {
-        public:
-            TplJournalImpl(const std::string& journalId,
-                           const std::string& journalDirectory,
-                           const std::string& journalBaseFilename,
-                           const qpid::sys::Duration getEventsTimeout,
-                           const qpid::sys::Duration flushTimeout,
-                           qpid::management::ManagementAgent* agent) :
-                JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
-            {}
+}; // class JournalImpl
 
-            ~TplJournalImpl() {}
+class TplJournalImpl : public JournalImpl
+{
+  public:
+    TplJournalImpl(const std::string& journalId,
+                   const std::string& journalDirectory,
+                   const std::string& journalBaseFilename,
+                   const qpid::sys::Duration getEventsTimeout,
+                   const qpid::sys::Duration flushTimeout,
+                   qpid::management::ManagementAgent* agent) :
+        JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+    {}
 
-            // Special version of read_data_record that ignores transactions - needed when reading the TPL
-            inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
-                    void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
-                    mrg::journal::data_tok* const dtokp) {
-                return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
-            }
-            inline void read_reset() { _rmgr.invalidate(); }
-        }; // class TplJournalImpl
+    ~TplJournalImpl() {}
 
-    } // namespace msgstore
+    // Special version of read_data_record that ignores transactions - needed when reading the TPL
+    inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+                                                void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+                                                mrg::journal::data_tok* const dtokp) {
+        return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+    }
+    inline void read_reset() { _rmgr.invalidate(); }
+}; // class TplJournalImpl
+
+} // namespace msgstore
 } // namespace mrg
 
 #endif

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2010-05-31 19:31:45 UTC (rev 3997)
@@ -367,6 +367,7 @@
         for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
         {
             JournalImpl* jQueue = i->second;
+            jQueue->resetDeleteCallback();
             if (jQueue->is_ready()) jQueue->stop(true);
         }
     }
@@ -479,7 +480,8 @@
     }
 
     jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
-                             defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+                             defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+                             boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
         qpid::sys::Mutex::ScopedLock sl(journalListLock);
         journalList[queue.getName()]=jQueue;
@@ -521,7 +523,7 @@
         queue.setExternalQueueStore(0); // will delete the journal if exists
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
-            journalList.erase(journalList.find(queue.getName()));
+            journalList.erase(queue.getName());
         }
     }
 }
@@ -762,7 +764,8 @@
             break;
         }
         jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
-                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+                                 defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+                                 boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
             qpid::sys::Mutex::ScopedLock sl(journalListLock);
             journalList[queueName] = jQueue;
@@ -1644,6 +1647,11 @@
 
 std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
 
+void MessageStoreImpl::journalDeleted(JournalImpl& j) {
+    qpid::sys::Mutex::ScopedLock sl(journalListLock);
+    journalList.erase(j.id());
+}
+
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
                                              qpid::Options(name),
                                              numJrnlFiles(defNumJrnlFiles),
@@ -1668,7 +1676,7 @@
 //                 "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
 //         ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
 //                 "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
-        ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+         ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
                 "Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
         ("truncate", qpid::optValue(truncateFlag, "yes|no"),
                 "If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
@@ -1687,3 +1695,4 @@
                 "Lower values decrease latency at the expense of throughput.")
         ;
 }
+

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2010-05-31 14:08:28 UTC (rev 3996)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2010-05-31 19:31:45 UTC (rev 3997)
@@ -149,6 +149,7 @@
 
     qmf::com::redhat::rhm::store::Store* mgmtObject;
     qpid::management::ManagementAgent* agent;
+    
 
     // Parameter validation and calculation
     static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
@@ -359,6 +360,10 @@
         { return qpid::management::Manageable::STATUS_OK; }
 
     std::string getStoreDir() const;
+
+  private:
+    void journalDeleted(JournalImpl&);
+
 }; // class MessageStoreImpl
 
 } // namespace msgstore



More information about the rhmessaging-commits mailing list