Author: cctrieloff
Date: 2007-11-02 21:03:10 -0400 (Fri, 02 Nov 2007)
New Revision: 1230
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/system_test.sh
Log:
- check for db mode change
- support for force
- store last mode in config db - key 1
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-03 01:03:10 UTC (rev 1230)
@@ -51,6 +51,7 @@
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
+ configDb(&env, 0),
exchangeDb(&env, 0),
messageDb(&env, 0),
mappingDb(&env, 0),
@@ -66,19 +67,23 @@
}
-void BdbMessageStore::init(const std::string& dir, const bool async)
+bool BdbMessageStore::init(const std::string& dir, const bool async, const bool
force)
{
+ if (isInit) return true;
+
useAsync = async;
if (dir.size()>0) storeDir = dir;
string bdbdir = storeDir + "/rhm/dat/";
journal::jdir::create_dir(bdbdir);
TxnCtxt txn;
+ bool ret = false;
try {
env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
txn.begin(env);
open(queueDb, txn.get(), "queues.db", false);
+ open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
open(messageDb, txn.get(), "messages.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
@@ -94,10 +99,61 @@
txn.abort();
throw e;
}
+ ret = mode(async, force);
+ if (!ret) return false;
isInit = true;
+ return true;
}
+// true is async
+bool BdbMessageStore::mode(const bool async, const bool force)
+{
+
+ u_int32_t id (1); // key one in config is mode
+ Dbt key(&id, sizeof(id));
+ size_t preamble_length = sizeof(u_int32_t);
+ BufferValue value(preamble_length, 0);
+ u_int32_t avalue = async ? 1 : 2;
+ value.buffer.putLong( avalue );
+ bool same = false;
+ bool hasMode = false;
+
+ {
+ Cursor config;
+ config.open(configDb, 0);
+ IdDbt rkey;
+ BufferValue rvalue(preamble_length, 0);
+ rvalue.buffer.record();
+
+ while (config.next(rkey, rvalue)) {
+ if (rkey.id == 1)
+ {
+ hasMode = true;
+ u_int32_t valueL = rvalue.buffer.getLong();
+ if (avalue == valueL){
+ same = true;
+ }else {
+ break;
+ }
+ }
+ }
+ }
+ if (same) return true;
+ if (!same && !force && hasMode) return false;
+ if (!same && force && hasMode) {
+ truncate();
+ }
+
+ int status = configDb.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT
);
+ if (status == DB_KEYEXIST) {
+ return false;
+ } else {
+ return true;
+ }
+ return false;
+}
+
void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
@@ -121,7 +177,6 @@
void BdbMessageStore::truncate()
{
- checkInit();
DbTxn* txn;
env.txn_begin(0, &txn, 0);
u_int32_t count;
@@ -131,15 +186,13 @@
}
txn->commit(0);
- if (usingJrnl()) {
- try{
- journal::jdir::delete_dir(getJrnlBaseDir(),true);
- }
- catch ( journal::jexception& e) {
- std::string str;
- THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str)
);
- }
- }
+ try{
+ journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ }
+ catch ( journal::jexception& e) {
+ std::string str;
+ THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str)
);
+ }
}
void BdbMessageStore::create(PersistableQueue& queue)
@@ -337,7 +390,6 @@
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
-std::cout << e << std::flush;
std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-03 01:03:10 UTC (rev 1230)
@@ -62,6 +62,7 @@
std::list<Db*> dbs;
DbEnv env;
Db queueDb;
+ Db configDb;
Db exchangeDb;
Db messageDb;
Db mappingDb;
@@ -78,6 +79,7 @@
const char* envPath;
static qpid::sys::Duration defJournalTimeout;
+ bool mode(const bool mode, const bool force);
void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
txn_list& locked, message_index& messages);
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
@@ -121,11 +123,11 @@
string getJrnlDir(const char* queueName);
static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
- inline void checkInit() { if (!isInit) init("/var",false); isInit = true;}
+ inline void checkInit() { if (!isInit) init("/var",false, false); isInit =
true;}
public:
BdbMessageStore(const char* envpath = 0);
- void init(const std::string& dir, const bool async);
+ bool init(const std::string& dir, const bool async, const bool force = false);
~BdbMessageStore();
void truncate();
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2007-11-03 01:03:10 UTC (rev 1230)
@@ -104,7 +104,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var",async,true);
store->truncate();
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-11-03 01:03:10 UTC (rev 1230)
@@ -126,7 +126,7 @@
void testCreateDelete(bool async)
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var",async, true);
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2007-11-03 01:03:10 UTC (rev 1230)
@@ -95,7 +95,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var",async, true);
store->truncate();
//create two queues:
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-11-03 01:03:10 UTC (rev 1230)
@@ -336,7 +336,7 @@
void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var",async,true);
store->truncate();
//create two queues:
Modified: store/trunk/cpp/tests/system_test.sh
===================================================================
--- store/trunk/cpp/tests/system_test.sh 2007-11-02 22:16:50 UTC (rev 1229)
+++ store/trunk/cpp/tests/system_test.sh 2007-11-03 01:03:10 UTC (rev 1230)
@@ -63,13 +63,12 @@
JRNLFLAGS=''
while ((sync <= 1)); do
echo
- echo '*** FIXME: Journal cannot be started when previous mode was BDB and
database exists.'
- rm -rf $WORKING_DIR/*
if ((sync == 1)); then
- JRNLFLAGS='--store-async yes'
+ JRNLFLAGS='--store-async yes --store-force yes'
mode='jrnl'
echo 'Journal (AIO) persistence...'
else
+ JRNLFLAGS='--store-force yes'
mode='bdb'
echo 'BDB persistence...'
fi