Author: kpvdr
Date: 2007-11-26 16:50:03 -0500 (Mon, 26 Nov 2007)
New Revision: 1361
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/tests/SimpleTest.cpp
Log:
Switched all regular PersistentMessage* and PersistentMessage& to
intrusive_ptr<PersistentMessage>, so as to hook into the refcount for a message
while it is in the store. Matches changes made in qpid.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-26 21:50:03 UTC (rev 1361)
@@ -35,6 +35,7 @@
using namespace rhm::bdbstore;
using namespace qpid::broker;
using boost::static_pointer_cast;
+using boost::intrusive_ptr;
using std::auto_ptr;
using std::max;
@@ -719,19 +720,19 @@
readXids(prepareXidDb, xids);
}
-void BdbMessageStore::stage( PersistableMessage& msg)
+void BdbMessageStore::stage( intrusive_ptr<PersistableMessage>& msg)
{
checkInit();
TxnCtxt txn;
txn.begin(env);
- u_int64_t messageId (msg.getPersistenceId());
- if (messageId == 0 || !msg.isContentReleased()) {
+ u_int64_t messageId (msg->getPersistenceId());
+ if (messageId == 0 || !msg->isContentReleased()) {
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
store(NULL, &txn, key, msg, true);
- msg.setPersistenceId(messageId);
+ msg->setPersistenceId(messageId);
txn.commit();
} catch (std::exception& e) {
txn.abort();
@@ -739,10 +740,10 @@
}
}
}
-void BdbMessageStore::destroy(PersistableMessage& msg)
+void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
{
checkInit();
- u_int64_t messageId (msg.getPersistenceId());
+ u_int64_t messageId (msg->getPersistenceId());
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
TxnCtxt txn;
@@ -773,10 +774,10 @@
return peek.get_size();
}
-void BdbMessageStore::appendContent(const PersistableMessage& msg, const
std::string& data)
+void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&
msg, const std::string& data)
{
checkInit();
- u_int64_t messageId (msg.getPersistenceId());
+ u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
Dbt key (&messageId, sizeof(messageId));
@@ -803,11 +804,11 @@
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- const PersistableMessage& msg, std::string& data, u_int64_t offset,
u_int32_t length)
+ intrusive_ptr<const PersistableMessage>& msg, std::string&
data, u_int64_t offset, u_int32_t length)
{
checkInit();
- u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+
msg.encodedHeaderSize();
- u_int64_t messageId (msg.getPersistenceId());
+ u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+
msg->encodedHeaderSize();
+ u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
@@ -857,11 +858,12 @@
}
}
-void BdbMessageStore::enqueue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue)
+void BdbMessageStore::enqueue(TransactionContext* ctxt,
intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
- u_int64_t messageId (msg.getPersistenceId());
+ u_int64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
@@ -882,7 +884,7 @@
bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
- msg.setPersistenceId(messageId);
+ msg->setPersistenceId(messageId);
newId = true;
}
store(&queue, txn, key, msg, newId);
@@ -890,9 +892,9 @@
if (usingJrnl()){
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- if (msg.isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO -
remove once jrnl is used for transient policy see **
+ if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO -
remove once jrnl is used for transient policy see **
}else{
- msg.enqueueComplete(); // set enqueued for ack
+ msg->enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
// cct if using Journal do we need to wait for IO to complete before calling thus???
@@ -911,27 +913,28 @@
void BdbMessageStore::store(const PersistableQueue* queue,
TxnCtxt* txn, Dbt& messageId,
- PersistableMessage& message,
+ intrusive_ptr<PersistableMessage>& message,
bool newId)
{
- u_int32_t headerSize = message.encodedHeaderSize();
- u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
+ u_int32_t headerSize = message->encodedHeaderSize();
+ u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
char* buff= 0;
- if (!message.isContentReleased() )
+ if (!message->isContentReleased() )
{
buff = static_cast<char*>(::alloca(size)); // long + headers + content
Buffer buffer(buff,size);
buffer.putLong(headerSize);
- message.encode(buffer);
+ message->encode(buffer);
}
try {
if ( queue && usingJrnl()){
+//std::cout << "E" << std::flush;
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->ref();
- dtokp->setSourceMessage (&message);
- dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal
header (record-id)
+ dtokp->setSourceMessage (message.get());
+ dtokp->set_rid(message->getPersistenceId()); // set the messageID into the
Journal header (record-id)
bool written = false;
unsigned aio_sleep_cnt = 0;
@@ -941,13 +944,13 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
rhm::journal::iores eres;
if (txn->getXid().empty()){
- if (message.isContentReleased()){
+ if (message->isContentReleased()){
eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
}else {
eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
}else {
- if (message.isContentReleased()){
+ if (message->isContentReleased()){
eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(),
false);
} else {
eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(),
txn->getXid(), false);
@@ -956,18 +959,21 @@
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
+//std::cout << "." << std::flush;
if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
written = true;
aio_sleep_cnt = 0;
busy_sleep_cnt = 0;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
+//std::cout << "w" << std::flush;
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO:
RHM_IORES_AIO_WAIT");
usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get
events
jc->get_wr_events();
break;
case rhm::journal::RHM_IORES_BUSY:
+//std::cout << "b" << std::flush;
if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for mutex:
RHM_IORES_BUSY");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call
as option
@@ -997,11 +1003,12 @@
}
}
-void BdbMessageStore::dequeue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue)
+void BdbMessageStore::dequeue(TransactionContext* ctxt,
intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
- u_int64_t messageId (msg.getPersistenceId());
+ u_int64_t messageId (msg->getPersistenceId());
if (messageId == 0) {
THROW_STORE_EXCEPTION("Error dequeing message, persistence id not
set");
}
@@ -1025,16 +1032,16 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
// added here as we are not doing it async on call back
- if (msg.isContentReleased()) // TODO remove this code once jrnl is used for transient
policy see **
+ if (msg->isContentReleased()) // TODO remove this code once jrnl is used for
transient policy see **
{
Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
dequeue(txn->get(), key, value);
}
- msg.dequeueComplete();
- if ( msg.isDequeueComplete() ) // clear id after last dequeue
- msg.setPersistenceId(0);
+ msg->dequeueComplete();
+ if ( msg->isDequeueComplete() ) // clear id after last dequeue
+ msg->setPersistenceId(0);
} else if (txn->isTPC()) {
//if this is part of a 2pc transaction, then only record the dequeue now,
@@ -1044,8 +1051,8 @@
Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
if (dequeue(txn->get(), key, value)) {
- msg.setPersistenceId(0);//clear id as we have now removed the message
from the store
- msg.dequeueComplete(); // set dequeued for ack
+ msg->setPersistenceId(0);//clear id as we have now removed the message
from the store
+ msg->dequeueComplete(); // set dequeued for ack
}
}
if (!ctxt) txn->commit();
@@ -1059,14 +1066,16 @@
}
}
-void BdbMessageStore::async_dequeue(TransactionContext* ctxt, PersistableMessage&
msg, const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt,
intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
+//std::cout << "D" << std::flush;
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->ref();
- ddtokp->setSourceMessage (&msg);
+ ddtokp->setSourceMessage (msg.get());
ddtokp->set_rid(messageIdSequence.next());
- ddtokp->set_dequeue_rid(msg.getPersistenceId());
+ ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
string tid;
@@ -1092,17 +1101,20 @@
switch (dres)
{
case rhm::journal::RHM_IORES_SUCCESS:
+//std::cout << "." << std::flush;
aio_sleep_cnt = 0;
busy_sleep_cnt = 0;
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
+//std::cout << "w" << std::flush;
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO:
RHM_IORES_AIO_WAIT");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as
option
jc->get_wr_events();
break;
case rhm::journal::RHM_IORES_BUSY:
+//std::cout << "b" << std::flush;
if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for mutex:
RHM_IORES_BUSY");
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as
option
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-26 21:50:03 UTC (rev 1361)
@@ -39,6 +39,7 @@
#include <set>
#include <iostream>
#include <boost/format.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/ptr_container/ptr_list.hpp>
#include "JournalImpl.h"
#include "DataTokenImpl.h"
@@ -82,31 +83,31 @@
bool mode(const bool mode, const bool force);
void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
- txn_list& locked, message_index& messages);
+ txn_list& locked, message_index& messages);
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
+ txn_list& locked, message_index& prepared);
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked, message_index& prepared);
qpid::broker::RecoverableMessage::shared_ptr
getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
+ uint64_t mId, unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges,
queue_index& queues);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked,
message_index& prepared);
+ queue_index& index, txn_list& locked, message_index&
prepared);
void recoverXids(txn_list& txns);
void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
- qpid::broker::PersistableMessage& message,
- bool newId);
+ Dbt& messageId,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
- qpid::broker::PersistableMessage& msg,
- const qpid::broker::PersistableQueue& queue);
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);
@@ -142,26 +143,27 @@
void destroy(const qpid::broker::PersistableExchange& queue);
void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable&
args);
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable&
args);
void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable&
args);
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key, const qpid::framing::FieldTable&
args);
void recover(qpid::broker::RecoveryManager& queues);
- void stage(qpid::broker::PersistableMessage& msg);
- void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(const qpid::broker::PersistableMessage& msg, const
std::string& data);
- void loadContent(const qpid::broker::PersistableQueue& queue, const
qpid::broker::PersistableMessage& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>&
msg);
+ void
destroy(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void appendContent(boost::intrusive_ptr<const
qpid::broker::PersistableMessage>& msg, const std::string& data);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ boost::intrusive_ptr<const
qpid::broker::PersistableMessage>& msg,
+ std::string& data, u_int64_t offset, u_int32_t length);
void enqueue(qpid::broker::TransactionContext* ctxt,
- qpid::broker::PersistableMessage& msg,
- const qpid::broker::PersistableQueue& queue);
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>&
msg,
+ const qpid::broker::PersistableQueue& queue);
void dequeue(qpid::broker::TransactionContext* ctxt,
- qpid::broker::PersistableMessage& msg,
- const qpid::broker::PersistableQueue& queue);
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>&
msg,
+ const qpid::broker::PersistableQueue& queue);
void flush(const qpid::broker::PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-26 21:50:03 UTC (rev 1361)
@@ -209,6 +209,7 @@
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
return true;
+ //return _fid == ro->_ffid ? ro->_full : true;
}
}
#ifndef RHM_WRONLY
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-11-26 21:09:41 UTC (rev 1360)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-11-26 21:50:03 UTC (rev 1361)
@@ -40,6 +40,8 @@
using boost::static_pointer_cast;
using boost::dynamic_pointer_cast;
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
using namespace qpid;
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -332,16 +334,18 @@
//create & stage a message
Message::shared_ptr msg = MessageUtils::createMessage(exchange, routingKey,
messageId, (data1.size() + data2.size()));
+ intrusive_ptr<PersistableMessage> pmsg =
static_pointer_cast<PersistableMessage>(msg);
+ intrusive_ptr<const PersistableMessage> cpmsg =
static_pointer_cast<const PersistableMessage>(msg);
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
FieldTable table;
table.setString("abc", "xyz");
msg->getProperties<MessageProperties>()->setApplicationHeaders(table);
- store.stage(*msg);
+ store.stage(pmsg);
//append to it
msg->releaseContent(&store);//ensure that data is not held in memory
but is appended to disk when added
- store.appendContent(*msg, data1);
- store.appendContent(*msg, data2);
+ store.appendContent(cpmsg, data1);
+ store.appendContent(cpmsg, data2);
//AMQContentBody part1(data1);
//msg->addContent(&part1);FIXME
@@ -424,15 +428,17 @@
const string data("abcdefg");
Message::shared_ptr msg(MessageUtils::createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
+ intrusive_ptr<PersistableMessage> pmsg =
static_pointer_cast<PersistableMessage>(msg);
+ intrusive_ptr<const PersistableMessage> cpmsg =
static_pointer_cast<const PersistableMessage>(msg);
MessageUtils::addContent(msg, data);
- store.stage(*msg);
- store.destroy(*msg);
+ store.stage(pmsg);
+ store.destroy(pmsg);
try {
string loaded;
Queue queue("dummy", 0, &store, 0);
- store.loadContent(queue, *msg, loaded, 0, data.length());
+ store.loadContent(queue, cpmsg, loaded, 0, data.length());
CPPUNIT_ASSERT(false);
} catch (StoreException& e) {
}
@@ -448,19 +454,21 @@
const string data("abcdefg");
Message::shared_ptr msg(MessageUtils::createMessage("my_exchange",
"my_routing_key", "my_message", data.length()));
+ intrusive_ptr<PersistableMessage> pmsg =
static_pointer_cast<PersistableMessage>(msg);
+ intrusive_ptr<const PersistableMessage> cpmsg =
static_pointer_cast<const PersistableMessage>(msg);
MessageUtils::addContent(msg, data);
Queue queue("my_queue", 0, &store, 0);
store.create(queue);
- store.enqueue(0, *msg, queue);
- store.destroy(*msg);
+ store.enqueue(0, pmsg, queue);
+ store.destroy(pmsg);
string loaded;
- store.loadContent(queue, *msg, loaded, 0, data.length());
+ store.loadContent(queue, cpmsg, loaded, 0, data.length());
CPPUNIT_ASSERT_EQUAL(data, loaded);
- store.dequeue(0, *msg, queue);
+ store.dequeue(0, pmsg, queue);
store.destroy(queue);
}