[rhmessaging-commits] rhmessaging commits: r3518 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Jul 23 15:41:47 EDT 2009
Author: kpvdr
Date: 2009-07-23 15:41:47 -0400 (Thu, 23 Jul 2009)
New Revision: 3518
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
store/trunk/cpp/lib/StorePlugin.cpp
Log:
Added a finalize() to MessageStoreImpl which stops all instances of store immediately prior to module unload instead of relying soly on the destructor of each instance.
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-07-23 19:41:47 UTC (rev 3518)
@@ -333,6 +333,7 @@
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+ journalList["TplStore"]=tplStorePtr.get();
txn.commit();
} catch (const journal::jexception& e) {
txn.abort();
@@ -348,6 +349,15 @@
isInit = true;
}
+void MessageStoreImpl::finalize()
+{
+ for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+ {
+ JournalImpl* jQueue = i->second;
+ if (jQueue->is_ready()) jQueue->stop(true);
+ }
+}
+
void MessageStoreImpl::pushDown(const char* dirName, const char* bakDirName)
{
DIR* dir = ::opendir(dirName);
@@ -420,7 +430,7 @@
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+// if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
@@ -490,6 +500,7 @@
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
std::string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout, agent);
+ journalList[queue.getName()]=jQueue;
}
value = args.get("qpid.auto_expand");
@@ -526,6 +537,7 @@
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
+ journalList.erase(journalList.find(queue.getName()));
}
}
@@ -767,6 +779,7 @@
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ journalList[queueName] = jQueue;
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2009-07-23 19:41:47 UTC (rev 3518)
@@ -92,6 +92,10 @@
typedef std::map<std::string, TplRecover> TplRecoverMap;
typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
+ typedef std::pair<std::string, JournalImpl*> JournalListPair;
+ typedef std::map<std::string, JournalImpl*> JournalListMap;
+ typedef JournalListMap::iterator JournalListMapItr;
+
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
static const u_int32_t defJrnlFileSizePgs = 24;
@@ -115,6 +119,7 @@
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
TplRecoverMap tplRecoverMap;
+ JournalListMap journalList;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
@@ -136,6 +141,7 @@
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
+
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
qpid::management::ManagementAgent* agent;
@@ -296,6 +302,8 @@
void initManagement (qpid::broker::Broker* broker);
+ void finalize();
+
void truncate();
void create(qpid::broker::PersistableQueue& queue,
Modified: store/trunk/cpp/lib/StorePlugin.cpp
===================================================================
--- store/trunk/cpp/lib/StorePlugin.cpp 2009-07-22 18:51:08 UTC (rev 3517)
+++ store/trunk/cpp/lib/StorePlugin.cpp 2009-07-23 19:41:47 UTC (rev 3518)
@@ -55,6 +55,7 @@
}
store->init(&options);
broker->setStore (store);
+ target.addFinalizer(boost::bind(&StorePlugin::finalize, this));
}
void initialize(Plugin::Target& target)
@@ -63,6 +64,8 @@
((mrg::msgstore::MessageStoreImpl*) store)->initManagement (broker);
}
+ void finalize() { static_cast<mrg::msgstore::MessageStoreImpl*>(store)->finalize(); }
+
const char* id() {return "StorePlugin";}
};
More information about the rhmessaging-commits
mailing list