Author: cctrieloff
Date: 2007-10-29 11:03:14 -0400 (Mon, 29 Oct 2007)
New Revision: 1182
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
- set bdb home
- hack for missing event loop
- memory cleanup
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-29 14:20:07 UTC (rev 1181)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-29 15:03:14 UTC (rev 1182)
@@ -70,9 +70,11 @@
useAsync = async;
if (dir.size()>0) storeDir = dir;
+ string bdbdir = storeDir + "/rhm/dat/";
+ journal::jdir::create_dir(bdbdir);
TxnCtxt txn;
try {
- env.open(envPath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
txn.begin(env);
open(queueDb, txn.get(), "queues.db", false);
@@ -466,9 +468,11 @@
dtokp.reset();
dtokp.set_wstate(DataTokenImpl::ENQ);
- ::free(dbuff);
- if (xidbuff)
+
+ if (xidbuff)
::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
break;
}
case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -755,7 +759,11 @@
checkInit();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc) jc->flush();
+ if (jc){
+ jc->flush();
+ ::usleep(10000); /////////////// hack ----------- FIX!!
+ jc->get_wr_events();
+ }
}catch ( journal::jexception& e) {
std::string str;
THROW_STORE_EXCEPTION("Flush failed: " + e.to_string(str) );
@@ -904,6 +912,10 @@
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
+ // added here as we are not doing it async on call back
+ msg.dequeueComplete();
+ if ( msg.isDequeueComplete() ) // clear id after last dequeue
+ msg.setPersistenceId(0);
} else if (txn->isTPC()) {
//if this is part of a 2pc transaction, then only record the dequeue now,
@@ -1176,7 +1188,7 @@
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
- dir << storeDir<< "/rhm/" ;
+ dir << storeDir<< "/rhm/jrnl/" ;
return dir.str();
}