Author: kpvdr
Date: 2008-10-29 13:33:09 -0400 (Wed, 29 Oct 2008)
New Revision: 2693
Modified:
store/trunk/cpp/lib/Cursor.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/MessageStoreImpl.h
Log:
Fix for BZ462046 - "Automate bdb \'recover\' routine on retsrating crashed
broker".
Modified: store/trunk/cpp/lib/Cursor.h
===================================================================
--- store/trunk/cpp/lib/Cursor.h 2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/Cursor.h 2008-10-29 17:33:09 UTC (rev 2693)
@@ -24,6 +24,7 @@
#ifndef _Cursor_
#define _Cursor_
+#include <boost/shared_ptr.hpp>
#include "db-inc.h"
namespace mrg{
@@ -33,9 +34,12 @@
{
Dbc* cursor;
public:
+ typedef boost::shared_ptr<Db> db_ptr;
+
Cursor() : cursor(0) {}
virtual ~Cursor() { if(cursor) cursor->close(); }
- void open(Db& db, DbTxn* txn, u_int32_t flags = 0) { db.cursor(txn, &cursor,
flags); }
+
+ void open(db_ptr db, DbTxn* txn, u_int32_t flags = 0) { db->cursor(txn,
&cursor, flags); }
void close() { if(cursor) cursor->close(); cursor = 0; }
Dbc* get() { return cursor; }
Dbc* operator->() { return cursor; }
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-10-29 17:33:09 UTC (rev 2693)
@@ -63,13 +63,6 @@
MessageStoreImpl::MessageStoreImpl(const char* envpath) :
env(0),
- queueDb(&env, 0),
- configDb(&env, 0),
- exchangeDb(&env, 0),
- messageDb(&env, 0),
- mappingDb(&env, 0),
- bindingDb(&env, 0),
- generalDb(&env, 0),
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -186,14 +179,16 @@
// num-jfiles at max; disable auto-expand
autoJrnlExpand = false;
autoJrnlExpandMaxFiles = 0;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") must be higher than parameter "
<< numJrnlFilesParamName << " (" << numJrnlFiles <<
") which is at the maximum allowable value; disabling auto-expand.");
+ QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") must be higher than parameter "
+ << numJrnlFilesParamName << " (" <<
numJrnlFiles << ") which is at the maximum allowable value; disabling
auto-expand.");
return;
}
if (p > JRNL_MAX_NUM_FILES) {
// auto-expand-max-jfiles higher than max allowable, adjust
autoJrnlExpand = true;
autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") is above allowable maximum ("
<< JRNL_MAX_NUM_FILES << "); changing this parameter to maximum
value.");
+ QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") is above allowable maximum ("
+ << JRNL_MAX_NUM_FILES << "); changing this parameter to
maximum value.");
return;
}
if (p <= numJrnlFiles) {
@@ -201,7 +196,9 @@
u_int16_t incr = JRNL_MAX_NUM_FILES - numJrnlFiles > 1 ? 2 : 1;
autoJrnlExpand = true;
autoJrnlExpandMaxFiles = numJrnlFiles + incr;
- QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") is not above that of parameter "
<< numJrnlFilesParamName << " (" << numJrnlFiles <<
"); changing this parameter to value of parameter " <<
numJrnlFilesParamName << " plus " << incr << " ("
<< autoJrnlExpandMaxFiles << ").");
+ QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName
<< " (" << p << ") is not above that of parameter "
+ << numJrnlFilesParamName << " (" <<
numJrnlFiles << "); changing this parameter to value of parameter "
<< numJrnlFilesParamName
+ << " plus " << incr << " ("
<< autoJrnlExpandMaxFiles << ").");
return;
}
// No adjustments req'd, set values
@@ -281,7 +278,9 @@
journal::jdir::create_dir(getBdbBaseDir());
try {
- env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.set_errpfx("msgstore");
+ env.set_lg_regionmax(256000); // default = 65000
+ env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of
bd4 does not match that which created the store database. "
@@ -292,6 +291,17 @@
TxnCtxt txn;
try {
+ // Databases are constructed here instead of the constructor so that the
DB_RECOVER flag can be used
+ // against the database enviroment. Recover can only be performed if no databases
have been created
+ // against the environmnet at the time of recovery, as recovery invalidates the
environment.
+ queueDb.reset(new Db(&env, 0));
+ configDb.reset(new Db(&env, 0));
+ exchangeDb.reset(new Db(&env, 0));
+ messageDb.reset(new Db(&env, 0));
+ mappingDb.reset(new Db(&env, 0));
+ bindingDb.reset(new Db(&env, 0));
+ generalDb.reset(new Db(&env, 0));
+
txn.begin(env, false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
@@ -340,20 +350,20 @@
}
}
-void MessageStoreImpl::open(Db& db,
+void MessageStoreImpl::open(db_ptr db,
DbTxn* txn,
const char* file,
bool dupKey)
{
- if(dupKey) db.set_flags(DB_DUPSORT);
- db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
- dbs.push_back(&db);
+ if(dupKey) db->set_flags(DB_DUPSORT);
+ db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
+ dbs.push_back(db);
}
MessageStoreImpl::~MessageStoreImpl()
{
try {
- for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
+ for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
@@ -377,7 +387,7 @@
env.txn_begin(0, &txn, 0);
u_int32_t count;
- for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
+ for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->truncate(txn, &count, 0);
}
@@ -481,7 +491,7 @@
destroy(exchangeDb, exchange);
//need to also delete bindings
IdDbt key(exchange.getPersistenceId());
- bindingDb.del(0, &key, DB_AUTO_COMMIT);
+ bindingDb->del(0, &key, DB_AUTO_COMMIT);
}
void MessageStoreImpl::create(const PersistableConfig& general)
@@ -505,7 +515,7 @@
destroy(generalDb, general);
}
-bool MessageStoreImpl::create(Db& db,
+bool MessageStoreImpl::create(db_ptr db,
IdSequence& seq,
const Persistable& p)
{
@@ -517,7 +527,7 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- status = db.put(txn.get(), &key, &value, DB_NOOVERWRITE);
+ status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
txn.commit();
} catch (...) {
txn.abort();
@@ -531,10 +541,10 @@
}
}
-void MessageStoreImpl::destroy(Db& db, const Persistable& p)
+void MessageStoreImpl::destroy(db_ptr db, const Persistable& p)
{
IdDbt key(p.getPersistenceId());
- db.del(0, &key, DB_AUTO_COMMIT);
+ db->del(0, &key, DB_AUTO_COMMIT);
}
@@ -946,7 +956,7 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
+ if (messageDb->get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
txn.abort();
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
}
@@ -955,7 +965,7 @@
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
- if (messageDb.get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
+ if (messageDb->get(txn.get(), &key, &header, 0) == DB_NOTFOUND) {
txn.abort();
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
}
@@ -1156,7 +1166,7 @@
}
}
-u_int64_t MessageStoreImpl::getRecordSize(Db& db,
+u_int64_t MessageStoreImpl::getRecordSize(db_ptr db,
Dbt& key)
{
u_int64_t ret = 0;
@@ -1173,14 +1183,14 @@
}
u_int64_t MessageStoreImpl::getRecordSize(DbTxn* txn,
- Db& db,
+ db_ptr db,
Dbt& key)
{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
peek.set_ulen(0);
try {
- int status = db.get(txn, &key, &peek, 0);
+ int status = db->get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
THROW_STORE_EXCEPTION("Unexpected status code when determining record
length: " + std::string(DbEnv::strerror(status)));
}
@@ -1213,7 +1223,7 @@
value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
value.set_doff(offset);
value.set_dlen(size);
- messageDb.put(txn.get(), &key, &value, 0);
+ messageDb->put(txn.get(), &key, &value, 0);
txn.commit();
} catch (const DbException& e) {
txn.abort();
@@ -1259,7 +1269,7 @@
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb.get(txn.get(), &key, &value, 0);
+ int status = messageDb->get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
@@ -1369,7 +1379,7 @@
/// cct message db
if (newId) { // only store in Bd if first time message is stored
Dbt data(buff,size);
- messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
+ messageDb->put(txn->get(), &messageId, &data,
DB_NOOVERWRITE);
}
}
} catch (const journal::jexception& e) {
@@ -1454,7 +1464,7 @@
Dbt& messageId)
{
if (isUnused(cursor, messageId)) {
- messageDb.del(txn, &messageId, 0);
+ messageDb->del(txn, &messageId, 0);
return true;
} else {
return false;
@@ -1582,13 +1592,13 @@
return txn;
}
-void MessageStoreImpl::put(Db& db,
+void MessageStoreImpl::put(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value)
{
try {
- int status = db.put(txn, &key, &value, DB_NODUPDATA);
+ int status = db->put(txn, &key, &value, DB_NODUPDATA);
if (status == DB_KEYEXIST) {
THROW_STORE_EXCEPTION("duplicate data");
} else if (status) {
@@ -1599,7 +1609,7 @@
}
}
-bool MessageStoreImpl::deleteKeyValuePair(Db& db,
+bool MessageStoreImpl::deleteKeyValuePair(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value)
Modified: store/trunk/cpp/lib/MessageStoreImpl.h
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.h 2008-10-28 16:16:20 UTC (rev 2692)
+++ store/trunk/cpp/lib/MessageStoreImpl.h 2008-10-29 17:33:09 UTC (rev 2693)
@@ -54,6 +54,7 @@
class MessageStoreImpl : public qpid::broker::MessageStore, public
qpid::management::Manageable
{
public:
+ typedef boost::shared_ptr<Db> db_ptr;
struct StoreOptions : public qpid::Options {
StoreOptions(const std::string& name="Store Options");
std::string clusterName;
@@ -99,15 +100,15 @@
static const bool defAutoJrnlExpand = true;
static const u_int16_t defAutoJrnlExpandMaxFiles = 16;
- std::list<Db*> dbs;
+ std::list<db_ptr> dbs;
DbEnv env;
- Db queueDb;
- Db configDb;
- Db exchangeDb;
- Db messageDb;
- Db mappingDb;
- Db bindingDb;
- Db generalDb;
+ db_ptr queueDb;
+ db_ptr configDb;
+ db_ptr exchangeDb;
+ db_ptr messageDb;
+ db_ptr mappingDb;
+ db_ptr bindingDb;
+ db_ptr generalDb;
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
@@ -204,14 +205,14 @@
Dbt& messageId);
bool isUnused(Cursor& cursor,
Dbt& messageId);
- void destroy(Db& db,
+ void destroy(db_ptr db,
const qpid::broker::Persistable& p);
- bool create(Db& db,
+ bool create(db_ptr db,
IdSequence& seq,
const qpid::broker::Persistable& p);
void completed(TxnCtxt& txn,
bool commit);
- void record2pcOp(Db& db,
+ void record2pcOp(db_ptr db,
TPCTxnCtxt& txn,
u_int64_t messageId,
u_int64_t queueId);
@@ -220,20 +221,20 @@
const qpid::broker::PersistableQueue& queue,
const std::string& key);
- u_int64_t getRecordSize(Db& db,
+ u_int64_t getRecordSize(db_ptr db,
Dbt& key);
u_int64_t getRecordSize(DbTxn* txn,
- Db& db,
+ db_ptr db,
Dbt& key);
- void put(Db& db,
+ void put(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value);
- bool deleteKeyValuePair(Db& db,
+ bool deleteKeyValuePair(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value);
- void open(Db& db,
+ void open(db_ptr db,
DbTxn* txn,
const char* file,
bool dupKey);