[rhmessaging-commits] rhmessaging commits: r1817 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Apr 1 11:16:41 EDT 2008


Author: gordonsim
Date: 2008-04-01 11:16:41 -0400 (Tue, 01 Apr 2008)
New Revision: 1817

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Batch recovery of large number of messages into fix sized transactions to avoid problems recovering large message dbs.



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-01 05:45:34 UTC (rev 1816)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-04-01 15:16:41 UTC (rev 1817)
@@ -619,9 +619,14 @@
 
 
 // bdb version
-void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery, queue_index& index,
+void BdbMessageStore::recoverMessages(TxnCtxt&, RecoveryManager& recovery, queue_index& index,
                                       txn_list& locked, message_index& prepared)
 {
+    //have to create a new txn here, and commit in batches to avoid
+    //problems with large message databases
+    TxnCtxt txn;
+    txn.begin(env);
+
     Cursor messages;
     messages.open(messageDb, txn.get());
 
@@ -631,17 +636,27 @@
 
     BufferValue value(preamble_length, 0);
     value.buffer.record();
+    uint count(0);
     while (messages.next(key, value)) {
+        if (++count % 1000 == 0) {
+            QPID_LOG(debug, "Recovering " << count << "th message...");
+            //reset cursor && txn:
+            messages.close();
+            txn.commit();
+            txn.begin(env);
+            messages.open(messageDb, txn.get());
+            messages->get(&key, &value, DB_SET);
+        }
         //read header only to begin with
         u_int32_t headerSize = value.buffer.getLong();
         value.buffer.restore();
 
         BufferValue header(headerSize, preamble_length);
         messages.current(key, header);
-
+        
         RecoverableMessage::shared_ptr msg = recovery.recoverMessage(header.buffer);
         msg->setPersistenceId(key.id);
-        
+
         u_int32_t contentOffset = headerSize + preamble_length;
         u_int64_t contentSize = getRecordSize(txn.get(), messageDb, key) - contentOffset;
         if (msg->loadContent(contentSize)) {
@@ -650,7 +665,7 @@
             messages.current(key, content);
             msg->decodeContent(content.buffer);
         }
-        
+
         //find all the queues into which this message has been enqueued
         if (enqueueMessage(txn, key, msg, index, locked, prepared) == 0) {
             //message not referenced anywhere - can delete
@@ -688,6 +703,7 @@
             count++;
         }
     }
+    mappings.close();
     return count;
 }
 
@@ -751,14 +767,6 @@
     while (c.next(key, value)) {
         std::string xid(reinterpret_cast<char*>(key.get_data()), key.get_size());
         LockedMappings::add(mappings, xid, value.queueId(), value.messageId());
-        /*
-          txn_lock_map::iterator i = mappings.find(xid);
-          if (i == mappings.end()) {
-          LockedMappings::shared_ptr ptr(new LockedMappings());
-          i = mappings.insert(std::make_pair(xid, ptr)).first;
-          }
-          i->second->add(value.queueId(), value.messageId());
-        */
     }
 }
 




More information about the rhmessaging-commits mailing list