[rhmessaging-commits] rhmessaging commits: r3518 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Jul 23 15:41:47 EDT 2009


Author: kpvdr
Date: 2009-07-23 15:41:47 -0400 (Thu, 23 Jul 2009)
New Revision: 3518

Modified:
   store/trunk/cpp/lib/MessageStoreImpl.cpp
   store/trunk/cpp/lib/MessageStoreImpl.h
   store/trunk/cpp/lib/StorePlugin.cpp
Log:
Added a finalize() to MessageStoreImpl which stops all instances of store immediately prior to module unload instead of relying soly on the destructor of each instance.

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-07-23 19:41:47 UTC (rev 3518)
@@ -333,6 +333,7 @@
         open(bindingDb, txn.get(), "bindings.db", true);
         open(generalDb, txn.get(), "general.db",  false);
         tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+        journalList["TplStore"]=tplStorePtr.get();
         txn.commit();
     } catch (const journal::jexception& e) {
         txn.abort();
@@ -348,6 +349,15 @@
     isInit = true;
 }
 
+void MessageStoreImpl::finalize()
+{
+    for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+    {
+        JournalImpl* jQueue = i->second;
+        if (jQueue->is_ready()) jQueue->stop(true);
+    }
+}
+
 void MessageStoreImpl::pushDown(const char* dirName, const char* bakDirName)
 {
     DIR* dir = ::opendir(dirName);
@@ -420,7 +430,7 @@
         for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
             (*i)->close(0);
         }
-        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+//        if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
     } catch (const DbException& e) {
         QPID_LOG(error, "Error closing BDB databases: " <<  e.what());
     } catch (const journal::jexception& e) {
@@ -490,6 +500,7 @@
         jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
                                  std::string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout, agent);
+        journalList[queue.getName()]=jQueue;
     }
 
     value = args.get("qpid.auto_expand");
@@ -526,6 +537,7 @@
         JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
         jQueue->delete_jrnl_files();
         queue.setExternalQueueStore(0); // will delete the journal if exists
+        journalList.erase(journalList.find(queue.getName()));
     }
 }
 
@@ -767,6 +779,7 @@
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
             jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+            journalList[queueName] = jQueue;
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 

Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h	2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/MessageStoreImpl.h	2009-07-23 19:41:47 UTC (rev 3518)
@@ -92,6 +92,10 @@
     typedef std::map<std::string, TplRecover> TplRecoverMap;
     typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
 
+    typedef std::pair<std::string, JournalImpl*> JournalListPair;
+    typedef std::map<std::string, JournalImpl*> JournalListMap;
+    typedef JournalListMap::iterator JournalListMapItr;
+
     // Default store settings
     static const u_int16_t defNumJrnlFiles = 8;
     static const u_int32_t defJrnlFileSizePgs = 24;
@@ -115,6 +119,7 @@
     // Pointer to Transaction Prepared List (TPL) journal instance
     boost::shared_ptr<TplJournalImpl> tplStorePtr;
     TplRecoverMap tplRecoverMap;
+    JournalListMap journalList;
 
     IdSequence queueIdSequence;
     IdSequence exchangeIdSequence;
@@ -136,6 +141,7 @@
     const char* envPath;
     static qpid::sys::Duration defJournalGetEventsTimeout;
     static qpid::sys::Duration defJournalFlushTimeout;
+
     qmf::com::redhat::rhm::store::Store* mgmtObject;
     qpid::sys::Mutex jrnlCreateLock;
     qpid::management::ManagementAgent* agent;
@@ -296,6 +302,8 @@
 
     void initManagement (qpid::broker::Broker* broker);
 
+    void finalize();
+
     void truncate();
 
     void create(qpid::broker::PersistableQueue& queue,

Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp	2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/StorePlugin.cpp	2009-07-23 19:41:47 UTC (rev 3518)
@@ -55,6 +55,7 @@
         }
         store->init(&options);
         broker->setStore (store);
+        target.addFinalizer(boost::bind(&StorePlugin::finalize, this));
     }
 
     void initialize(Plugin::Target& target)
@@ -63,6 +64,8 @@
         ((mrg::msgstore::MessageStoreImpl*) store)->initManagement (broker);
     }
 
+    void finalize() { static_cast<mrg::msgstore::MessageStoreImpl*>(store)->finalize(); }
+
     const char* id() {return "StorePlugin";}
 };
 



More information about the rhmessaging-commits mailing list