[rhmessaging-commits] rhmessaging commits: r1182 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Oct 29 11:03:14 EDT 2007


Author: cctrieloff
Date: 2007-10-29 11:03:14 -0400 (Mon, 29 Oct 2007)
New Revision: 1182

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
- set bdb home
- hack for missing event loop
- memory cleanup



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-29 14:20:07 UTC (rev 1181)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-29 15:03:14 UTC (rev 1182)
@@ -70,9 +70,11 @@
 	useAsync = async;
 	if (dir.size()>0) storeDir = dir;
 
+	string bdbdir = storeDir + "/rhm/dat/";
+	journal::jdir::create_dir(bdbdir);
     TxnCtxt txn;
     try {
-        env.open(envPath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+        env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
 
         txn.begin(env);
         open(queueDb, txn.get(), "queues.db", false);
@@ -466,9 +468,11 @@
 
     				dtokp.reset();
 	    			dtokp.set_wstate(DataTokenImpl::ENQ);
-					::free(dbuff);
-					if (xidbuff)
+					
+    				if (xidbuff)
 						::free(xidbuff);
+					else if (dbuff)
+						::free(dbuff);
                     break;
 		        }
                 case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -755,7 +759,11 @@
 	checkInit();
     try {
 		JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
-		if (jc) jc->flush();
+		if (jc){
+		    jc->flush();
+			::usleep(10000);  /////////////// hack ----------- FIX!!
+			jc->get_wr_events();
+		}
     }catch ( journal::jexception& e) {
        std::string str;
        THROW_STORE_EXCEPTION("Flush failed: " + e.to_string(str) );
@@ -904,6 +912,10 @@
 			// add queue* to the txn map..
 			if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
 			async_dequeue(ctxt, msg, queue); 
+     	    // added here as we are not doing it async on call back
+			msg.dequeueComplete();
+		    if ( msg.isDequeueComplete()  ) // clear id after last dequeue
+		         msg.setPersistenceId(0);
 			
 		} else if (txn->isTPC()) {
             //if this is part of a 2pc transaction, then only record the dequeue now,
@@ -1176,7 +1188,7 @@
 string BdbMessageStore::getJrnlBaseDir() 
 {
     std::stringstream dir;
-    dir << storeDir<< "/rhm/" ;
+    dir << storeDir<< "/rhm/jrnl/" ;
     return dir.str();
 }
 




More information about the rhmessaging-commits mailing list