[rhmessaging-commits] rhmessaging commits: r1114 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Thu Oct 18 21:34:08 EDT 2007


Author: cctrieloff
Date: 2007-10-18 21:34:08 -0400 (Thu, 18 Oct 2007)
New Revision: 1114

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
Log:
- add flush for async
- get config for dir + async from broker



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-18 17:19:49 UTC (rev 1113)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-10-19 01:34:08 UTC (rev 1114)
@@ -42,6 +42,7 @@
 
 static const u_int8_t MESSAGE_MESSAGE = 1;
 static const u_int8_t BASIC_MESSAGE = 2;
+bool BdbMessageStore::useAsync;
 
 unsigned int TxnCtxt::count = 0;
 
@@ -54,12 +55,23 @@
                                                         bindingDb(&env, 0), 
                                                         enqueueXidDb(&env, 0), 
                                                         dequeueXidDb(&env, 0), 
-                                                        prepareXidDb(&env, 0)
- 
+                                                        prepareXidDb(&env, 0),
+														isInit(false),
+														envPath(envpath)
+
 {
+
+
+}
+
+void BdbMessageStore::init(const std::string& dir, const bool async) 
+{ 
+	useAsync = async;
+	if (dir.size()>0) storeDir = dir;
+
     TxnCtxt txn;
     try {
-        env.open(envpath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+        env.open(envPath, DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
 
         txn.begin(env);
         open(queueDb, txn.get(), "queues.db", false);
@@ -78,11 +90,13 @@
         txn.abort();
         throw e;
     }
+
+	isInit = true;
 }
 
 void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
 {
-    if(dupKey) db.set_flags(DB_DUPSORT);
+	if(dupKey) db.set_flags(DB_DUPSORT);
     db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
     dbs.push_back(&db);
 }
@@ -103,6 +117,7 @@
 
 void BdbMessageStore::truncate()
 {
+	checkInit();
     DbTxn* txn;
     env.txn_begin(0, &txn, 0);
     u_int32_t count;
@@ -125,6 +140,7 @@
 
 void BdbMessageStore::create(PersistableQueue& queue)
 {
+	checkInit();
     if (queue.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
@@ -151,6 +167,7 @@
 
 void BdbMessageStore::destroy(PersistableQueue& queue)
 {
+	checkInit();
     destroy(queueDb, queue);
     qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
     if (eqs)
@@ -164,6 +181,7 @@
 
 void BdbMessageStore::create(const PersistableExchange& exchange)
 {
+	checkInit();
     if (exchange.getPersistenceId()) {
         THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
     }
@@ -179,6 +197,7 @@
 
 void BdbMessageStore::destroy(const PersistableExchange& exchange)
 {
+	checkInit();
     destroy(exchangeDb, exchange);
     //need to also delete bindings
     IdDbt key(exchange.getPersistenceId());
@@ -210,6 +229,7 @@
 void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q, 
                               const std::string& k, const FieldTable& a)
 {
+	checkInit();
     IdDbt key(e.getPersistenceId());    
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
@@ -221,6 +241,7 @@
 void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q, 
                                 const std::string& k, const FieldTable& a)
 {
+	checkInit();
     IdDbt key(e.getPersistenceId());    
     BindingDbt value(e, q, k, a);
 
@@ -237,6 +258,7 @@
 
 void BdbMessageStore::recover(RecoveryManager& registry)
 {
+	checkInit();
     txn_list prepared;
     recoverXids(prepared);
 
@@ -623,6 +645,7 @@
 
 void BdbMessageStore::stage(PersistableMessage& msg)
 {
+	checkInit();
     TxnCtxt txn;
     txn.begin(env);
 
@@ -642,6 +665,7 @@
 }
 void BdbMessageStore::destroy(PersistableMessage& msg)
 {
+	checkInit();
     u_int64_t messageId (msg.getPersistenceId());
     if (messageId) {
         Dbt key (&messageId, sizeof(messageId));
@@ -675,6 +699,7 @@
 
 void BdbMessageStore::appendContent(const PersistableMessage& msg, const std::string& data)
 {
+	checkInit();
     u_int64_t messageId (msg.getPersistenceId());
     if (messageId != 0) {
         try {
@@ -702,6 +727,7 @@
 }
 void BdbMessageStore::loadContent(const PersistableMessage& msg, std::string& data, u_int64_t offset, u_int32_t length)
 {
+	checkInit();
     u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg.encodedHeaderSize();
     u_int64_t messageId (msg.getPersistenceId());
     if (messageId != 0) {
@@ -729,8 +755,21 @@
     }
 }
 
+void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+{
+	checkInit();
+    try {
+		JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+		if (jc) jc->flush();
+    }catch ( journal::jexception& e) {
+       std::string str;
+       THROW_STORE_EXCEPTION("Flush failed: " + e.to_string(str) );
+	}
+}
+
 void BdbMessageStore::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
 {
+	checkInit();
     u_int64_t queueId (queue.getPersistenceId());
     u_int64_t messageId (msg.getPersistenceId());
     if (queueId == 0) {
@@ -846,7 +885,7 @@
       }
     }catch ( journal::jexception& e) {
        std::string str;
-	   std::cout << "-------------" << e << std::endl;
+//	   std::cout << "-------------" << e << std::endl;
 	   if (dtokp) delete dtokp;
        THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
     }catch (DbException& e) {
@@ -856,6 +895,7 @@
 
 void BdbMessageStore::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
 {    
+	checkInit();
     u_int64_t queueId (queue.getPersistenceId());
     u_int64_t messageId (msg.getPersistenceId());
     if (messageId == 0) {
@@ -974,6 +1014,7 @@
 
 u_int32_t BdbMessageStore::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
 {
+	checkInit();
     return 0;
 }
 
@@ -1034,6 +1075,7 @@
 
 auto_ptr<TransactionContext> BdbMessageStore::begin() 
 {
+	checkInit();
 	// pass sequence number for c/a when using jrnl
     TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
     txn->begin(env);
@@ -1042,6 +1084,7 @@
 
 std::auto_ptr<qpid::broker::TPCTransactionContext> BdbMessageStore::begin(const std::string& xid)
 {
+	checkInit();
 	IdSequence* jtx = NULL;
 	if (usingJrnl()) jtx = &messageIdSequence;
 
@@ -1053,6 +1096,7 @@
 
 void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
 {
+	checkInit();
     TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
     if(!txn) throw InvalidTransactionContextException();
     
@@ -1075,7 +1119,7 @@
 
 void BdbMessageStore::commit(TransactionContext& ctxt) 
 {
-std::cout << " commit1" << std::flush;
+	checkInit();
      TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), enqueueXidDb, dequeueXidDb, true);        
@@ -1086,6 +1130,7 @@
 
 void BdbMessageStore::abort(TransactionContext& ctxt) 
 {
+	checkInit();
     TxnCtxt* txn(check(&ctxt));
     if (txn->isTPC()) {
         completed(*dynamic_cast<TPCTxnCtxt*>(txn), dequeueXidDb, enqueueXidDb, false);
@@ -1152,7 +1197,7 @@
 string BdbMessageStore::getJrnlBaseDir() 
 {
     std::stringstream dir;
-    dir << "/var/rhm/" ;
+    dir << storeDir<< "/rhm/" ;
     return dir.str();
 }
 
@@ -1167,7 +1212,6 @@
     dir << getJrnlBaseDir();
     dir << std::setw(4);
     dir << std::setfill('0');
-//    const char* str = queueName; //queue.getName().c_str();
     u_int32_t count = 0;
     for (u_int32_t i=0; i < strlen(queueName); i++)
     	count += queueName[i];

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-10-18 17:19:49 UTC (rev 1113)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-10-19 01:34:08 UTC (rev 1114)
@@ -71,6 +71,10 @@
             IdSequence queueIdSequence;
             IdSequence exchangeIdSequence;
             IdSequence messageIdSequence;
+			static bool useAsync;
+			std::string storeDir;
+			bool isInit;
+			const char* envPath;
 
             void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
 	    			 txn_list& locked, message_index& messages);
@@ -113,12 +117,13 @@
 	  	  	void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
 	  	  	string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
 	  	  	string getJrnlDir(const char* queueName);
-	  	  	static inline bool usingJrnl() {return false;} // make configurable
+	  	  	static inline bool usingJrnl() {return useAsync;} 
 	  	  	string getJrnlBaseDir(); 
+			inline void checkInit() { if (!isInit)	init("/var",false);	isInit = true;}
 
-
         public:
             BdbMessageStore(const char* envpath = 0);
+			void init(const std::string& dir, const bool async);
             ~BdbMessageStore();
 
             void truncate();
@@ -149,6 +154,7 @@
             void dequeue(qpid::broker::TransactionContext* ctxt, 
                          qpid::broker::PersistableMessage& msg, 
                          const qpid::broker::PersistableQueue& queue);
+            void flush(const qpid::broker::PersistableQueue& queue);
 
     	    u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
 




More information about the rhmessaging-commits mailing list