Author: kpvdr
Date: 2008-07-30 16:28:39 -0400 (Wed, 30 Jul 2008)
New Revision: 2238
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
Log:
Backports of trunk r.2236,2237: Improvements on fix for BZ457239, also tightening up on
exception handling around transactions in BdbMessageStore. Also added txn to one addtional
call omitted on previouis fixes.
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-30 20:09:46 UTC (rev 2237)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-30 20:28:39 UTC (rev 2238)
@@ -249,7 +249,7 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
- } catch (const std::exception& e) {
+ } catch (...) {
txn.abort();
throw;
}
@@ -359,7 +359,6 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ":
create() failed: " + e.what());
}
try {
- qpid::sys::Mutex::ScopedLock sl(bdbQueueLock);
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
@@ -435,7 +434,16 @@
Dbt key(&id, sizeof(id));
BufferValue value (p);
- int status = db.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT);
+ int status;
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ status = db.put(txn.get(), &key, &value, DB_NOOVERWRITE);
+ txn.commit();
+ } catch (...) {
+ txn.abort();
+ throw;
+ }
if (status == DB_KEYEXIST) {
return false;
} else {
@@ -461,8 +469,13 @@
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(env, true);
- put(bindingDb, txn.get(), key, value);
- txn.commit();
+ try {
+ put(bindingDb, txn.get(), key, value);
+ txn.commit();
+ } catch (...) {
+ txn.abort();
+ throw;
+ }
}
void BdbMessageStore::unbind(const PersistableExchange& e,
@@ -498,10 +511,12 @@
recoverGeneral(txn, registry);
txn.commit();
-
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
//recover transactions:
@@ -785,7 +800,7 @@
value.buffer.record();
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
try {
if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
txn.abort();
@@ -804,6 +819,9 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION("Unexpected BDB error in
BdbMessageStore::getExternMessage(): " + string(e.what()));
+ } catch (...) {
+ txn.abort();
+ throw;
}
return ret;
}
@@ -922,7 +940,7 @@
store(NULL, &txn, key, msg, true);
msg->setPersistenceId(messageId);
txn.commit();
- } catch (const std::exception& e) {
+ } catch (...) {
txn.abort();
throw;
}
@@ -943,6 +961,9 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
}
}
@@ -950,7 +971,17 @@
u_int64_t BdbMessageStore::getRecordSize(Db& db,
Dbt& key)
{
- return getRecordSize(0, db, key);
+ u_int64_t ret = 0;
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ ret = getRecordSize(txn.get(), db, key);
+ txn.commit();
+ } catch (...) {
+ txn.abort();
+ throw;
+ }
+ return ret;
}
u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
@@ -977,6 +1008,8 @@
checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
+ TxnCtxt txn;
+ txn.begin(env, true);
try {
Dbt key (&messageId, sizeof(messageId));
u_int64_t offset = getRecordSize(messageDb, key);
@@ -992,9 +1025,14 @@
value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
value.set_doff(offset);
value.set_dlen(size);
- messageDb.put(0, &key, &value, DB_AUTO_COMMIT);
+ messageDb.put(txn.get(), &key, &value, 0);
+ txn.commit();
} catch (const DbException& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error appending content", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
} else {
THROW_STORE_EXCEPTION("Cannot append content. Message not known to
store!");
@@ -1035,16 +1073,18 @@
value.set_dlen(length);
int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
- txn.abort();
delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
- } else {
- txn.commit();
- data.assign(buffer, value.get_size());
- delete [] buffer;
}
+ data.assign(buffer, value.get_size());
+ delete [] buffer;
+ txn.commit();
} catch (const DbException& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error loading content", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
@@ -1378,26 +1418,32 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- Cursor bindings;
- bindings.open(bindingDb, txn.get());
+ {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
- IdDbt key;
- Dbt value;
- while (bindings.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
- if (buffer.available() < 8) {
- THROW_STORE_EXCEPTION("Not enough data for binding");
+ IdDbt key;
+ Dbt value;
+ while (bindings.next(key, value)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
+ }
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
+ }
}
- uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
- bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
- }
}
+ txn.commit();
} catch (const std::exception& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ } catch (...) {
+ txn.abort();
+ throw;
}
- txn.commit();
QPID_LOG(debug, "Deleted all bindings for " << queue.getName()
<< ":" << queue.getPersistenceId());
}
@@ -1408,33 +1454,39 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- Cursor bindings;
- bindings.open(bindingDb, txn.get());
+ {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
- IdDbt key(exchange.getPersistenceId());
- Dbt value;
+ IdDbt key(exchange.getPersistenceId());
+ Dbt value;
- for (int status = bindings->get(&key, &value, DB_SET); status == 0;
status = bindings->get(&key, &value, DB_NEXT_DUP)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
- if (buffer.available() < 8) {
- THROW_STORE_EXCEPTION("Not enough data for binding");
- }
- uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
- std::string q;
- std::string k;
- buffer.getShortString(q);
- buffer.getShortString(k);
- if (bkey == k) {
- bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
+ for (int status = bindings->get(&key, &value, DB_SET); status ==
0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()),
value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
}
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ std::string q;
+ std::string k;
+ buffer.getShortString(q);
+ buffer.getShortString(k);
+ if (bkey == k) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " <<
queue.getName() << " " << key.id << "->" <<
queueId);
+ }
+ }
}
}
+ txn.commit();
} catch (const std::exception& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ } catch (...) {
+ txn.abort();
+ throw;
}
- txn.commit();
}
string BdbMessageStore::getJrnlBaseDir()
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-30 20:09:46 UTC (rev 2237)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-30 20:28:39 UTC (rev 2238)
@@ -115,7 +115,6 @@
static qpid::sys::Duration defJournalFlushTimeout;
qpid::management::Store::shared_ptr mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
- qpid::sys::Mutex bdbQueueLock;
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,