Author: kpvdr
Date: 2009-01-27 16:41:40 -0500 (Tue, 27 Jan 2009)
New Revision: 3075
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/MessageStoreImpl.cpp
Log:
Fix for BZ 481822 - "Store: Message content loss during staging/loading if header
size changes in interim".
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2009-01-27 21:41:40 UTC (rev 3075)
@@ -237,7 +237,7 @@
#define MAX_AIO_SLEEPS 1000 // 10 sec
#define AIO_SLEEP_TIME 10000 // 10 ms
bool
-JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t
length)
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length)
{
if (_dtok.rid() != rid)
{
@@ -307,6 +307,7 @@
}
if (_external)
return false;
+ u_int32_t offset = qpid::framing::Buffer(static_cast<char*>(_datap),
sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
if (offset + length > _dlen) {
data.append((const char*)_datap + offset, _dlen - offset);
} else {
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/JournalImpl.h 2009-01-27 21:41:40 UTC (rev 3075)
@@ -148,7 +148,7 @@
// Temporary fn to read and save last msg read from journal so it can be
assigned
// in chunks. To be replaced when coding to do this direct from the journal
is ready.
// Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t offset,
size_t length);
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length);
// Overrides for write inactivity timer
void enqueue_data_record(const void* const data_buff, const size_t
tot_data_len,
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-01-27 15:09:50 UTC (rev 3074)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-01-27 21:41:40 UTC (rev 3075)
@@ -1258,14 +1258,13 @@
u_int32_t length)
{
checkInit();
- u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ +
msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
JournalImpl* jc =
static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ) {
- if (jc->loadMsgContent(messageId, data, realOffset, length)) {
+ if (jc->loadMsgContent(messageId, data, length)) {
return;
}
}
@@ -1278,11 +1277,26 @@
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
Dbt value(buffer, length);
+
+ // Read the first 4 bytes (u_int32_t) which is the header size.
value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
+ value.set_ulen(sizeof(u_int32_t));
+ value.set_doff(0);
+ value.set_dlen(sizeof(u_int32_t));
+ int status = messageDb->get(txn.get(), &key, &value, 0);
+ if (status == DB_NOTFOUND) {
+ delete [] buffer;
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+ u_int32_t hdr_size = Buffer(buffer, sizeof(u_int32_t)).getLong();
+
+ // Now read the data.
+ u_int64_t realOffset = sizeof(u_int32_t) + hdr_size + offset;
+ value.set_flags(DB_DBT_USERMEM | DB_DBT_PARTIAL);
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb->get(txn.get(), &key, &value, 0);
+ status = messageDb->get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
Show replies by date