Author: gordonsim
Date: 2008-04-01 11:16:41 -0400 (Tue, 01 Apr 2008)
New Revision: 1817
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Batch recovery of large number of messages into fix sized transactions to avoid problems
recovering large message dbs.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-04-01 05:45:34 UTC (rev 1816)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-04-01 15:16:41 UTC (rev 1817)
@@ -619,9 +619,14 @@
// bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery,
queue_index& index,
+void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery,
queue_index& index,
txn_list& locked, message_index& prepared)
{
+ //have to create a new txn here, and commit in batches to avoid
+ //problems with large message databases
+ TxnCtxt txn;
+ txn.begin(env);
+
Cursor messages;
messages.open(messageDb, txn.get());
@@ -631,17 +636,27 @@
BufferValue value(preamble_length, 0);
value.buffer.record();
+ uint count(0);
while (messages.next(key, value)) {
+ if (++count % 1000 == 0) {
+ QPID_LOG(debug, "Recovering " << count << "th
message...");
+ //reset cursor && txn:
+ messages.close();
+ txn.commit();
+ txn.begin(env);
+ messages.open(messageDb, txn.get());
+ messages->get(&key, &value, DB_SET);
+ }
//read header only to begin with
u_int32_t headerSize = value.buffer.getLong();
value.buffer.restore();
BufferValue header(headerSize, preamble_length);
messages.current(key, header);
-
+
RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
msg->setPersistenceId(key.id);
-
+
u_int32_t contentOffset = headerSize + preamble_length;
u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) -
contentOffset;
if (msg->loadContent(contentSize)) {
@@ -650,7 +665,7 @@
messages.current(key, content);
msg->decodeContent(content.buffer);
}
-
+
//find all the queues into which this message has been enqueued
if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
//message not referenced anywhere - can delete
@@ -688,6 +703,7 @@
count++;
}
}
+ mappings.close();
return count;
}
@@ -751,14 +767,6 @@
while (c.next(key, value)) {
std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
- /*
- txn_lock_map::iterator i = mappings.find(xid);
- if (i == mappings.end()) {
- LockedMappings::shared_ptr ptr(new LockedMappings());
- i = mappings.insert(std::make_pair(xid, ptr)).first;
- }
- i->second->add(value.queueId(), value.messageId());
- */
}
}