Author: cctrieloff
Date: 2007-11-07 10:05:52 -0500 (Wed, 07 Nov 2007)
New Revision: 1258
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
- staging support
- no staged enqueue, content released, then loaded not yet supported on async
mode
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-07 15:05:52 UTC (rev 1258)
@@ -492,16 +492,22 @@
{
case rhm::journal::RHM_IORES_SUCCESS:{
msg_count++;
- char* data = (char*)dbuff;
- unsigned headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want
read size or header size ????
-
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
+
+ unsigned headerSize;
+ if (externalFlag){
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message
external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we
want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
msg->setPersistenceId(dtokp.rid());
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize)) {
+ if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
@@ -545,6 +551,29 @@
}
+RecoverableMessage::shared_ptr
BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t messageId, unsigned& headerSize)
+{
+ Dbt key (&messageId, sizeof(messageId));
+ size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+
+ BufferValue value(preamble_length, 0);
+ value.buffer.record();
+ if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+ //read header only to begin with
+ headerSize = value.buffer.getLong();
+
+ BufferValue header(headerSize, preamble_length);
+ if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to
store!");
+ }
+
+ return recovery.recoverMessage(header.buffer);
+}
+
+
// bdb version
void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery,
queue_index& index,
txn_list& locked, message_index& prepared)
@@ -700,13 +729,13 @@
txn.begin(env);
u_int64_t messageId (msg.getPersistenceId());
- if (messageId == 0) {
+ if (messageId == 0 || !msg.isContentReleased()) {
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
store(NULL, &txn, key, msg, true);
msg.setPersistenceId(messageId);
- txn.commit();
+ txn.commit();
} catch (std::exception& e) {
txn.abort();
throw e;
@@ -813,8 +842,6 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc){
jc->flush();
-// ::usleep(10000); /////////////// hack ----------- FIX!!
-// jc->get_wr_events();
}
}catch ( journal::jexception& e) {
THROW_STORE_EXCEPTION("Flush failed: " + e.to_string() );
@@ -854,6 +881,7 @@
if (usingJrnl()){
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ if (msg.isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO -
remove once jrnl is used for transient policy see **
}else{
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
@@ -879,10 +907,14 @@
{
u_int32_t headerSize = message.encodedHeaderSize();
u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
- char* buff = static_cast<char*>(::alloca(size)); // long + headers + content
- Buffer buffer(buff,size);
- buffer.putLong(headerSize);
- message.encode(buffer);
+ char* buff= 0;
+ if (!message.isContentReleased() )
+ {
+ buff = static_cast<char*>(::alloca(size)); // long + headers + content
+ Buffer buffer(buff,size);
+ buffer.putLong(headerSize);
+ message.encode(buffer);
+ }
try {
@@ -898,9 +930,17 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
rhm::journal::iores eres;
if (txn->getXid().empty()){
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ if (message.isContentReleased()){
+ eres = jc->enqueue_extern_data_record(0, dtokp.get(), false);
+ }else {
+ eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+ }
}else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
+ if (message.isContentReleased()){
+ eres = jc->enqueue_extern_txn_data_record(0, dtokp.get(), txn->getXid(),
false);
+ } else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
+ }
}
switch (eres)
{
@@ -963,6 +1003,13 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
// added here as we are not doing it async on call back
+ if (msg.isContentReleased()) // TODO remove this code once jrnl is used for transient
policy see **
+ {
+ Dbt key (&messageId, sizeof(messageId));
+ Dbt value (&queueId, sizeof(queueId));
+ dequeue(txn->get(), key, value);
+ }
+
msg.dequeueComplete();
if ( msg.isDequeueComplete() ) // clear id after last dequeue
msg.setPersistenceId(0);
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-06 21:52:47 UTC (rev 1257)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-07 15:05:52 UTC (rev 1258)
@@ -88,6 +88,8 @@
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& locked, message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr
getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges,
queue_index& queues);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,