Author: gordonsim
Date: 2007-12-13 13:09:05 -0500 (Thu, 13 Dec 2007)
New Revision: 1479
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Allow getRecordSize() to be called within a txn to avoid deadlocks
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 17:55:25 UTC (rev 1478)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-13 18:09:05 UTC (rev 1479)
@@ -644,7 +644,7 @@
msg->setPersistenceId(key.id);
u_int32_t contentOffset = headerSize + preamble_length;
- u_int64_t contentSize = getRecordSize(messageDb, key) - contentOffset;
+ u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) -
contentOffset;
if (msg->loadContent(contentSize)) {
//now read the content
BufferValue content(contentSize, contentOffset);
@@ -805,13 +805,19 @@
}
}
+
u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
{
+ return getRecordSize(0, db, key);
+}
+
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
peek.set_ulen(0);
try {
- int status = db.get(0, &key, &peek, 0);
+ int status = db.get(txn, &key, &peek, 0);
if (status != DB_BUFFER_SMALL) {
THROW_STORE_EXCEPTION("Unexpected status code when determining record
length: " + string(DbEnv::strerror(status)));
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 17:55:25 UTC (rev 1478)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-13 18:09:05 UTC (rev 1479)
@@ -126,6 +126,7 @@
void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId,
u_int64_t queueId);
u_int64_t getRecordSize(Db& db, Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt&
value);
void open(Db& db, DbTxn* txn, const char* file, bool dupKey);