[rhmessaging-commits] rhmessaging commits: r2236 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Jul 30 14:32:51 EDT 2008
Author: kpvdr
Date: 2008-07-30 14:32:51 -0400 (Wed, 30 Jul 2008)
New Revision: 2236
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Improvements on fix for BZ457239, also tightening up on exception handling around transactions in BdbMessageStore
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-30 16:35:25 UTC (rev 2235)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-30 18:32:51 UTC (rev 2236)
@@ -255,7 +255,7 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
- } catch (const std::exception& e) {
+ } catch (...) {
txn.abort();
throw;
}
@@ -373,7 +373,6 @@
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
try {
- qpid::sys::Mutex::ScopedLock sl(bdbQueueLock);
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
@@ -449,7 +448,16 @@
Dbt key(&id, sizeof(id));
BufferValue value (p);
- int status = db.put(0, &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT);
+ int status;
+ TxnCtxt txn;
+ txn.begin(env, true);
+ try {
+ status = db.put(txn.get(), &key, &value, DB_NOOVERWRITE | DB_AUTO_COMMIT);
+ txn.commit();
+ } catch (...) {
+ txn.abort();
+ throw;
+ }
if (status == DB_KEYEXIST) {
return false;
} else {
@@ -475,8 +483,13 @@
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(env, true);
- put(bindingDb, txn.get(), key, value);
- txn.commit();
+ try {
+ put(bindingDb, txn.get(), key, value);
+ txn.commit();
+ } catch (...) {
+ txn.abort();
+ throw;
+ }
}
void BdbMessageStore::unbind(const PersistableExchange& e,
@@ -512,10 +525,12 @@
recoverGeneral(txn, registry);
txn.commit();
-
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
//recover transactions:
@@ -811,7 +826,7 @@
value.buffer.record();
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
try {
if (messageDb.get(txn.get(), &key, &value, 0) == DB_NOTFOUND) {
txn.abort();
@@ -831,6 +846,9 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+ } catch (...) {
+ txn.abort();
+ throw;
}
return ret;
}
@@ -949,7 +967,7 @@
store(NULL, &txn, key, msg, true);
msg->setPersistenceId(messageId);
txn.commit();
- } catch (const std::exception& e) {
+ } catch (...) {
txn.abort();
throw;
}
@@ -970,6 +988,9 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
}
}
@@ -979,11 +1000,11 @@
{
u_int64_t ret = 0;
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
try {
ret = getRecordSize(txn.get(), db, key);
txn.commit();
- } catch (const std::exception& e) {
+ } catch (...) {
txn.abort();
throw;
}
@@ -1072,16 +1093,18 @@
value.set_dlen(length);
int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
- txn.abort();
delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
- } else {
- txn.commit();
- data.assign(buffer, value.get_size());
- delete [] buffer;
}
+ data.assign(buffer, value.get_size());
+ delete [] buffer;
+ txn.commit();
} catch (const DbException& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error loading content", e);
+ } catch (...) {
+ txn.abort();
+ throw;
}
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
@@ -1426,26 +1449,32 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- Cursor bindings;
- bindings.open(bindingDb, txn.get());
+ {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
- IdDbt key;
- Dbt value;
- while (bindings.next(key, value)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
- if (buffer.available() < 8) {
- THROW_STORE_EXCEPTION("Not enough data for binding");
+ IdDbt key;
+ Dbt value;
+ while (bindings.next(key, value)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
+ }
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ }
}
- uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
- bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
- }
}
+ txn.commit();
} catch (const std::exception& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ } catch (...) {
+ txn.abort();
+ throw;
}
- txn.commit();
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
@@ -1456,33 +1485,39 @@
TxnCtxt txn;
txn.begin(env, true);
try {
- Cursor bindings;
- bindings.open(bindingDb, txn.get());
+ {
+ Cursor bindings;
+ bindings.open(bindingDb, txn.get());
- IdDbt key(exchange.getPersistenceId());
- Dbt value;
+ IdDbt key(exchange.getPersistenceId());
+ Dbt value;
- for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
- Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
- if (buffer.available() < 8) {
- THROW_STORE_EXCEPTION("Not enough data for binding");
- }
- uint64_t queueId = buffer.getLongLong();
- if (queue.getPersistenceId() == queueId) {
- std::string q;
- std::string k;
- buffer.getShortString(q);
- buffer.getShortString(k);
- if (bkey == k) {
- bindings->del(0);
- QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
+ Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
+ if (buffer.available() < 8) {
+ THROW_STORE_EXCEPTION("Not enough data for binding");
}
+ uint64_t queueId = buffer.getLongLong();
+ if (queue.getPersistenceId() == queueId) {
+ std::string q;
+ std::string k;
+ buffer.getShortString(q);
+ buffer.getShortString(k);
+ if (bkey == k) {
+ bindings->del(0);
+ QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
+ }
+ }
}
}
+ txn.commit();
} catch (const std::exception& e) {
+ txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
+ } catch (...) {
+ txn.abort();
+ throw;
}
- txn.commit();
}
string BdbMessageStore::getJrnlBaseDir()
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-07-30 16:35:25 UTC (rev 2235)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-30 18:32:51 UTC (rev 2236)
@@ -115,7 +115,6 @@
static qpid::sys::Duration defJournalFlushTimeout;
qpid::management::Store* mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
- qpid::sys::Mutex bdbQueueLock;
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
More information about the rhmessaging-commits
mailing list