Author: cctrieloff
Date: 2007-11-08 13:48:57 -0500 (Thu, 08 Nov 2007)
New Revision: 1270
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/tests/SimpleTest.cpp
Log:
- full support staging in async
- transient staging not optimized for AIO yet.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-08 18:48:57 UTC (rev 1270)
@@ -722,7 +722,7 @@
readXids(prepareXidDb, xids);
}
-void BdbMessageStore::stage(PersistableMessage& msg)
+void BdbMessageStore::stage( PersistableMessage& msg)
{
checkInit();
TxnCtxt txn;
@@ -804,13 +804,22 @@
THROW_STORE_EXCEPTION("Cannot append content. Message not known to
store!");
}
}
-void BdbMessageStore::loadContent(const PersistableMessage& msg, std::string&
data, u_int64_t offset, u_int32_t length)
+
+void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
+ const PersistableMessage& msg, std::string& data, u_int64_t offset,
u_int32_t length)
{
checkInit();
u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+
msg.encodedHeaderSize();
u_int64_t messageId (msg.getPersistenceId());
+
if (messageId != 0) {
try {
+ JournalImpl* jc =
static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (jc && jc->is_enqueued(messageId) ){
+ if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ return;
+ }
+ }
Dbt key (&messageId, sizeof(messageId));
char *buffer = new char[length];
Dbt value(buffer, length);
@@ -820,15 +829,17 @@
value.set_dlen(length);
int status = messageDb.get(0, &key, &value, 0);
if (status == DB_NOTFOUND) {
- delete [] buffer;
+ delete [] buffer;
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
} else {
data.assign(buffer, value.get_size());
- delete [] buffer;
+ delete [] buffer;
}
} catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error loading content", e);
- }
+ } catch (journal::jexception& e) {
+ THROW_STORE_EXCEPTION("Error loading content" /*, e*/);
+ }
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
}
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-08 18:48:57 UTC (rev 1270)
@@ -153,7 +153,8 @@
void stage(qpid::broker::PersistableMessage& msg);
void destroy(qpid::broker::PersistableMessage& msg);
void appendContent(const qpid::broker::PersistableMessage& msg, const
std::string& data);
- void loadContent(const qpid::broker::PersistableMessage& msg,
std::string& data, u_int64_t offset, u_int32_t length);
+ void loadContent(const qpid::broker::PersistableQueue& queue, const
qpid::broker::PersistableMessage& msg,
+ std::string& data, u_int64_t offset, u_int32_t length);
void enqueue(qpid::broker::TransactionContext* ctxt,
qpid::broker::PersistableMessage& msg,
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-08 18:48:57 UTC (rev 1270)
@@ -153,14 +153,14 @@
}
}
if (_external)
- return true;
+ return false;
if (offset + length > _dlen) {
std::stringstream ss;
ss << "loadMsgContent(): offset + length exceeds available message
size";
throw jexception(ss.str());
}
data.append((const char*)_datap + offset, length);
- return false;
+ return true;
}
const iores
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-11-08 15:25:15 UTC (rev 1269)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-11-08 18:48:57 UTC (rev 1270)
@@ -268,7 +268,8 @@
CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
DummyHandler handler;
- MessageDelivery::deliver(msg, handler, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 100);
+ QueuedMessage qm(queue.get(),msg,0);
+ MessageDelivery::deliver(qm, handler, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 100);
CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
AMQContentBody*
contentBody(dynamic_cast<AMQContentBody*>(handler.frames[2].getBody()));
CPPUNIT_ASSERT(contentBody);
@@ -314,7 +315,7 @@
}
void testStagingSync() {testStaging(false);}
- void testStagingAsync() {std::cout << std::endl << "Missing Async
test!!" << std::endl << std:: flush;}
+ void testStagingAsync() {testStaging(true);}
void testStaging(bool async)
{
const string name("MyDurableQueue");
@@ -354,7 +355,8 @@
//load it (without recovery)
DummyHandler handler;
- MessageDelivery::deliver(msg, handler, 0,
+ QueuedMessage qm(&queue, msg, 0);
+ MessageDelivery::deliver(qm, handler, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
CPPUNIT_ASSERT(handler.frames.size() > 2);
string loaded;
@@ -394,7 +396,8 @@
//load lazily
DummyHandler handler;
- MessageDelivery::deliver(msg, handler, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
+ QueuedMessage qm(queue.get(),msg,0);
+ MessageDelivery::deliver(qm, handler, 0,
MessageDelivery::getBasicConsumeToken("ignore"), 20);//52 chars of data, i.e. 2
chunks of 20 and one of 12
CPPUNIT_ASSERT(handler.frames.size() > 2);
string loaded;
@@ -411,7 +414,7 @@
}
void testDestroyStagedMessageSync() {testDestroyStagedMessage(false);}
- void testDestroyStagedMessageAsync() {std::cout << std::endl <<
"Missing Async test!!" << std::endl << std:: flush;}
+ void testDestroyStagedMessageAsync() {testDestroyStagedMessage(true);}
void testDestroyStagedMessage(bool async)
{
BdbMessageStore store;
@@ -427,14 +430,15 @@
try {
string loaded;
- store.loadContent(*msg, loaded, 0, data.length());
+ Queue queue("dummy", 0, &store, 0);
+ store.loadContent(queue, *msg, loaded, 0, data.length());
CPPUNIT_ASSERT(false);
} catch (StoreException& e) {
}
}
void testDestroyEnqueuedMessageSync() {testDestroyEnqueuedMessage(false);}
- void testDestroyEnqueuedMessageAsync() {std::cout << std::endl <<
"Missing Async test!!" << std::endl << std:: flush;}
+ void testDestroyEnqueuedMessageAsync() {testDestroyEnqueuedMessage(true);}
void testDestroyEnqueuedMessage(bool async)
{
BdbMessageStore store;
@@ -452,7 +456,7 @@
store.destroy(*msg);
string loaded;
- store.loadContent(*msg, loaded, 0, data.length());
+ store.loadContent(queue, *msg, loaded, 0, data.length());
CPPUNIT_ASSERT_EQUAL(data, loaded);
store.dequeue(0, *msg, queue);