[rhmessaging-commits] rhmessaging commits: r3075 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Jan 27 16:41:40 EST 2009


Author: kpvdr
Date: 2009-01-27 16:41:40 -0500 (Tue, 27 Jan 2009)
New Revision: 3075

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/MessageStoreImpl.cpp
Log:
Fix for BZ 481822 - "Store: Message content loss during staging/loading if header size changes in interim".

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2009-01-27 21:41:40 UTC (rev 3075)
@@ -237,7 +237,7 @@
 #define MAX_AIO_SLEEPS 1000  // 10 sec 
 #define AIO_SLEEP_TIME 10000 // 10 ms
 bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length)
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length)
 {
     if (_dtok.rid() != rid)
     {
@@ -307,6 +307,7 @@
     }
     if (_external)
         return false;
+    u_int32_t offset = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
     if (offset + length > _dlen) {
         data.append((const char*)_datap + offset, _dlen - offset);
     } else {

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/JournalImpl.h	2009-01-27 21:41:40 UTC (rev 3075)
@@ -148,7 +148,7 @@
             // Temporary fn to read and save last msg read from journal so it can be assigned
             // in chunks. To be replaced when coding to do this direct from the journal is ready.
             // Returns true if the record is extern, false if local.
-            bool loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t length);
+            bool loadMsgContent(u_int64_t rid, std::string& data, size_t length);
 
             // Overrides for write inactivity timer
             void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,

Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp	2009-01-27 21:41:40 UTC (rev 3075)
@@ -1258,14 +1258,13 @@
                                   u_int32_t length)
 {
     checkInit();
-    u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ + msg->encodedHeaderSize();
     u_int64_t messageId (msg->getPersistenceId());
 
     if (messageId != 0) {
         try {
             JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
             if (jc && jc->is_enqueued(messageId) ) {
-                if (jc->loadMsgContent(messageId, data, realOffset, length)) {
+                if (jc->loadMsgContent(messageId, data, length)) {
                     return;
                 }
             }
@@ -1278,11 +1277,26 @@
             Dbt key (&messageId, sizeof(messageId));
             char *buffer = new char[length];
             Dbt value(buffer, length);
+            
+            // Read the first 4 bytes (u_int32_t) which is the header size.
             value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+            value.set_ulen(sizeof(u_int32_t));
+            value.set_doff(0);
+            value.set_dlen(sizeof(u_int32_t));
+            int status = messageDb->get(txn.get(), &key, &value, 0);
+            if (status == DB_NOTFOUND) {
+                delete [] buffer;
+                THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+            }
+            u_int32_t hdr_size = Buffer(buffer, sizeof(u_int32_t)).getLong();
+
+            // Now read the data.
+            u_int64_t realOffset = sizeof(u_int32_t) + hdr_size + offset;
+            value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
             value.set_ulen(length);
             value.set_doff(realOffset);
             value.set_dlen(length);
-            int status = messageDb->get(txn.get(), &key, &value, 0);
+            status = messageDb->get(txn.get(), &key, &value, 0);
             if (status == DB_NOTFOUND) {
                 delete [] buffer;
                 THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");




More information about the rhmessaging-commits mailing list