[rhmessaging-commits] rhmessaging commits: r1230 - in store/trunk/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Nov 2 21:03:10 EDT 2007


Author: cctrieloff
Date: 2007-11-02 21:03:10 -0400 (Fri, 02 Nov 2007)
New Revision: 1230

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/tests/OrderingTest.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
   store/trunk/cpp/tests/system_test.sh
Log:
- check for db mode change
- support for force
- store last mode in config db - key 1



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-11-03 01:03:10 UTC (rev 1230)
@@ -51,6 +51,7 @@
 
 BdbMessageStore::BdbMessageStore(const char* envpath) : env(0), 
                                                         queueDb(&env, 0), 
+                                                        configDb(&env, 0), 
                                                         exchangeDb(&env, 0), 
                                                         messageDb(&env, 0), 
                                                         mappingDb(&env, 0), 
@@ -66,19 +67,23 @@
 
 }
 
-void BdbMessageStore::init(const std::string& dir, const bool async) 
+bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force) 
 { 
+	if (isInit) return true;
+	
 	useAsync = async;
 	if (dir.size()>0) storeDir = dir;
 
 	string bdbdir = storeDir + "/rhm/dat/";
 	journal::jdir::create_dir(bdbdir);
     TxnCtxt txn;
+	bool ret = false;
     try {
         env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
 
         txn.begin(env);
         open(queueDb, txn.get(), "queues.db", false);
+        open(configDb, txn.get(), "config.db", false);
         open(exchangeDb, txn.get(), "exchanges.db", false);
         open(messageDb, txn.get(), "messages.db", false);       
         open(mappingDb, txn.get(), "mappings.db", true);
@@ -94,10 +99,61 @@
         txn.abort();
         throw e;
     }
+	ret = mode(async, force);
+	if (!ret) return false;
 
 	isInit = true;
+	return true;
 }
 
+// true is async
+bool BdbMessageStore::mode(const bool async, const bool force)
+{
+
+	u_int32_t id (1); // key one in config is mode
+    Dbt key(&id, sizeof(id));
+    size_t preamble_length = sizeof(u_int32_t);
+    BufferValue value(preamble_length, 0);
+	u_int32_t avalue = async ? 1 : 2;
+	value.buffer.putLong( avalue );
+	bool same = false;
+	bool hasMode = false;
+    
+    {	
+	    Cursor config;
+        config.open(configDb, 0);
+        IdDbt rkey;
+        BufferValue rvalue(preamble_length, 0);
+        rvalue.buffer.record();
+
+        while (config.next(rkey, rvalue)) {
+            if (rkey.id == 1)
+		    {
+		        hasMode = true;
+			    u_int32_t valueL = rvalue.buffer.getLong();
+ 			    if (avalue == valueL){
+				    same = true;
+		        }else {
+				    break;
+		        }
+            }
+       }
+    }
+    if (same) return true;
+	if (!same && !force && hasMode) return false; 
+	if (!same && force && hasMode) {
+		truncate();
+    }
+	
+    int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT );
+    if (status == DB_KEYEXIST) {
+        return false;
+    } else {
+        return true;
+    }
+	return false;
+}
+
 void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
 {
 	if(dupKey) db.set_flags(DB_DUPSORT);
@@ -121,7 +177,6 @@
 
 void BdbMessageStore::truncate()
 {
-	checkInit();
     DbTxn* txn;
     env.txn_begin(0, &txn, 0);
     u_int32_t count;
@@ -131,15 +186,13 @@
     }
 
     txn->commit(0); 
-    if (usingJrnl()) {
-		try{    
-    	    journal::jdir::delete_dir(getJrnlBaseDir(),true);
-        }    
-		catch ( journal::jexception& e) {
-            std::string str;
-    	    THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
-        } 
-    }      
+	try{    
+        journal::jdir::delete_dir(getJrnlBaseDir(),true);
+    }    
+	catch ( journal::jexception& e) {
+        std::string str;
+        THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+    } 
 }
 
 void BdbMessageStore::create(PersistableQueue& queue)
@@ -337,7 +390,6 @@
                   recoverMessages(txn, registry, queue, prepared, messages); 
 				  jQueue->recover_complete(); // start journal.
 	          } catch (const journal::jexception& e) {
-std::cout << e << std::flush;
 	              std::string s;
                   THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
               }

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-11-03 01:03:10 UTC (rev 1230)
@@ -62,6 +62,7 @@
             std::list<Db*> dbs;
             DbEnv env;
             Db queueDb;
+            Db configDb;
             Db exchangeDb;
             Db messageDb;
             Db mappingDb;
@@ -78,6 +79,7 @@
 			const char* envPath;
             static qpid::sys::Duration defJournalTimeout;
 
+            bool mode(const bool mode, const bool force);
             void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
 	    			 txn_list& locked, message_index& messages);
             void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index, 
@@ -121,11 +123,11 @@
 	  	  	string getJrnlDir(const char* queueName);
 	  	  	static inline bool usingJrnl() {return useAsync;} 
 	  	  	string getJrnlBaseDir(); 
-			inline void checkInit() { if (!isInit)	init("/var",false);	isInit = true;}
+			inline void checkInit() { if (!isInit)	init("/var",false, false);	isInit = true;}
 
         public:
             BdbMessageStore(const char* envpath = 0);
-			void init(const std::string& dir, const bool async);
+			bool init(const std::string& dir, const bool async, const bool force = false);
             ~BdbMessageStore();
 
             void truncate();

Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/OrderingTest.cpp	2007-11-03 01:03:10 UTC (rev 1230)
@@ -104,7 +104,7 @@
     void setup(bool async)
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var",async,true);
 		store->truncate();
 
         queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2007-11-03 01:03:10 UTC (rev 1230)
@@ -126,7 +126,7 @@
     void testCreateDelete(bool async)
     {
         BdbMessageStore store;
-        store.init("/var",async);
+        store.init("/var",async, true);
         store.truncate();//make sure it is empty to begin with
         string name("CreateDeleteQueue");
         Queue queue(name, 0, &store, 0);

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2007-11-03 01:03:10 UTC (rev 1230)
@@ -95,7 +95,7 @@
     void setup(bool async)
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var",async, true);
         store->truncate();
 
         //create two queues:

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-11-03 01:03:10 UTC (rev 1230)
@@ -336,7 +336,7 @@
     void setup()
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var",async,true);
         store->truncate();
 
         //create two queues:

Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh	2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/system_test.sh	2007-11-03 01:03:10 UTC (rev 1230)
@@ -63,13 +63,12 @@
 JRNLFLAGS=''
 while ((sync <= 1)); do
     echo
-    echo '*** FIXME: Journal cannot be started when previous mode was BDB and database exists.'
-    rm -rf $WORKING_DIR/*
     if ((sync == 1)); then
-        JRNLFLAGS='--store-async yes'
+        JRNLFLAGS='--store-async yes --store-force yes'
         mode='jrnl'
         echo 'Journal (AIO) persistence...'
     else
+        JRNLFLAGS='--store-force yes'
         mode='bdb'
         echo 'BDB persistence...'
     fi




More information about the rhmessaging-commits mailing list