Author: cctrieloff
Date: 2007-09-21 15:12:19 -0400 (Fri, 21 Sep 2007)
New Revision: 938
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
- Implementation of restore (contains a hack on read, waiting for blocked read
and ** pointer read from kim to remove hack
- Fixes to Dequeue for AIO
- General bug fixes
- dbd still default store
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 16:41:22 UTC (rev 937)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-21 19:12:19 UTC (rev 938)
@@ -44,8 +44,6 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
-// cct delete this !!!!
-// bool hack = false;
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
@@ -257,10 +255,9 @@
TxnCtxt txn;
txn.begin(env);
try {
- //read all queues:
- recoverQueues(txn, registry, queues);
- //read all messages:
- recoverMessages(txn, registry, queues, prepared, messages);
+ //read all queues, calls recoversMessages
+ recoverQueues(txn, registry, queues, prepared, messages);
+
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
@@ -289,7 +286,8 @@
registry.recoveryComplete();
}
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry,
queue_index& index)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry,
queue_index& queue_index, txn_list&
+prepared, message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -306,28 +304,32 @@
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
- const char* queueName = queue->getName().c_str();
-
if (usingJrnl())
{
-
+ const char* queueName = queue->getName().c_str();
journal::jcntl* jQueue = new journal::jcntl(queueName,
getJrnlDir(queueName), string("JournalData"));
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try
- {
- jQueue->recover();
- } catch (journal::jexception& e) {
- std::string s;
+ try
+ {
+ jQueue->recover(); // start recovery
+ recoverMessages(txn, registry, queue, prepared, messages);
+ jQueue->recovered(); // start journal.
+ } catch (journal::jexception& e) {
+ std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
}
+ //read all messages: done on a per queue basis if using Journal
}
-
- index[key.id] = queue;
+ queue_index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
queueIdSequence.reset(maxQueueId + 1);
+
+ if (!usingJrnl()) //read all messages:
+ recoverMessages(txn, registry, queue_index, prepared, messages);
+
}
@@ -385,6 +387,98 @@
}
}
+// async IO version.
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked,
message_index& prepared)
+{
+
+ size_t preambleLength = sizeof(u_int32_t)/*header size*/;
+ u_int64_t maxMessageId(1);
+
+ journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
+ journal::data_tok dtokp;
+ size_t readSize = 0;
+// char** buff = 0;
+ unsigned aio_sleep_cnt = 0;
+ unsigned msg_count=0;
+ bool read = true;
+
+// hack...
+ char buff [1024]; size_t buffSize = 1024;
+
+
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ // read the message from the Journal.
+ while (read) {
+
+std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
+
+// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
+ rhm::journal::iores res;
+ try {
+ res = jc->read_data(&buff, buffSize, &dtokp);
+ } catch (rhm::journal::jexception& e) {
+ std::cout << "recover read" << e << std::endl;
+ std::string str;
+ THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
+ }
+ readSize = dtokp.dsize();
+ assert(readSize < buffSize); /// fail safe for hack...
+
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:{
+ msg_count++;
+ char* data = buff;
+ unsigned headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read
size or header size ????
+
+ RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ msg->setPersistenceId(dtokp.rid());
+
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+
+ if (msg->loadContent(contentSize)) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(),
dtokp.rid())) {
+ prepared[dtokp.rid()] = msg;
+ } else {
+ queue->recover(msg);
+ }
+
+ if (dtokp.rid() > maxMessageId) {
+ maxMessageId = dtokp.rid();
+ }
+
+ dtokp.reset();
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ break;
+ }
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ {
+ THROW_STORE_EXCEPTION("Store error, disk time out on recover
for:" + queue->getName());
+ }
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
+ read = false;
+ // inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+ assert (jc->get_enq_cnt() == msg_count);
+ break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ default:
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+ messageIdSequence.reset(maxMessageId + 1);
+
+}
+
+// bdb version
void BdbMessageStore::recoverMessages(TxnCtxt& txn, RecoveryManager& recovery,
queue_index& index,
txn_list& locked, message_index& prepared)
{
@@ -659,7 +753,7 @@
}
store(&queue, txn->get(), key, msg, newId);
- if (/*!hack || */ !usingJrnl()){
+ if (!usingJrnl()){
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
}
@@ -691,25 +785,21 @@
//buffer.flip();
+ journal::data_tok* dtokp = NULL;
try {
- if (/*hack &&*/ queue && usingJrnl()){
-
- if (queue){
- // cct TODO -- delete this in the callback...
- journal::data_tok* dtokp = new journal::data_tok();
- dtokp->setSourceMessage (&message);
+ if ( queue && usingJrnl()){
+ dtokp = new journal::data_tok;
+ // cct TODO -- delete this in the callback...
+ dtokp->setSourceMessage (&message);
dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal
header (record-id)
unsigned aio_sleep_cnt = 0;
bool written = false;
while (!written)
{
- journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- char text[4];
- strcpy(text,"123");
-
- rhm::journal::iores eres = jc->enqueue_data(&text, 3, dtokp);
+ journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
+ rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -717,32 +807,36 @@
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for: " +
queue->getName());
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+ delete dtokp;
+ THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for:
" + queue->getName());
+ }
usleep(AIO_SLEEP_TIME);
jc->get_wr_events();
aio_sleep_cnt++;
break;
case rhm::journal::RHM_IORES_FULL:
- THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :"
+ queue->getName());
+ delete dtokp;
+ THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full
:" + queue->getName());
break;
default:
- assert( "Unexpected msg state");
+ delete dtokp;
+ assert( "Store Error: Unexpected msg state");
}
- }
- }
+ }
- } else {
+ } else {
/// cct message db
if (newId){ // only store in Bd if first time message is stored
Dbt data(buff,size);
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
- }
- }
+ messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ }
+ }
}catch ( journal::jexception& e) {
std::string str;
- std::cout << "----" << e.to_string(str) << std::endl;
- THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+ std::cout << "-------------" << e << std::endl;
+ if (dtokp) delete dtokp;
+ THROW_STORE_EXCEPTION("Enqueue failed: " +e.to_string(str) );
}catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
@@ -777,9 +871,13 @@
Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
- if (dequeue(txn->get(), key, value)) {
- msg.setPersistenceId(0);//clear id as we have now removed the message
from the store
- msg.dequeueComplete(); // set dequeued for ack
+ if (usingJrnl()){
+ async_dequeue(ctxt, msg, queue);
+ }else{
+ if (dequeue(txn->get(), key, value)) {
+ msg.setPersistenceId(0);//clear id as we have now removed the
message from the store
+ msg.dequeueComplete(); // set dequeued for ack
+ }
}
}
if (!ctxt) txn->commit();
@@ -793,6 +891,48 @@
}
}
+void BdbMessageStore::async_dequeue(TransactionContext* /*ctxt*/, PersistableMessage&
msg, const PersistableQueue& queue)
+{
+ unsigned aio_sleep_cnt = 0;
+ bool written = false;
+ journal::data_tok* ddtokp = new journal::data_tok;
+ ddtokp->setSourceMessage (&msg);
+ ddtokp->set_rid(msg.getPersistenceId()); // message id to be dequeued
+ ddtokp->set_dequeue_rid(messageIdSequence.next());
+ ddtokp->set_wstate(journal::data_tok::ENQ);
+ journal::jcntl* jc = static_cast<journal::jcntl*>(queue.getExternalQueueStore());
+ while (!written)
+ {
+ rhm::journal::iores dres;
+ try {
+ dres = jc->dequeue_data(ddtokp);
+ } catch (rhm::journal::jexception& e) {
+ std::string str;
+ delete ddtokp;
+ THROW_STORE_EXCEPTION("Error dequeuing message" + e.to_string(str));
+ }
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+
+ written = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+ delete ddtokp;
+ THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for:
" + queue.getName());
+ }
+ jc->get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ aio_sleep_cnt++;
+ break;
+ default:
+ delete ddtokp;
+ assert( "Store Error: Unexpected msg state");
+ }
+ }
+}
+
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
{
Cursor cursor;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-21 16:41:22 UTC (rev 937)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-21 19:12:19 UTC (rev 938)
@@ -74,9 +74,13 @@
IdSequence exchangeIdSequence;
IdSequence messageIdSequence;
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index);
+ void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
+ txn_list& locked, message_index& messages);
void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
txn_list& locked, message_index& prepared);
+ void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked, message_index& prepared);
void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, exchange_index& index);
void recoverBindings(TxnCtxt& txn, exchange_index& exchanges,
queue_index& queues);
int enqueueMessage(TxnCtxt& txn, IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,
@@ -91,6 +95,9 @@
bool newId);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ qpid::broker::PersistableMessage& msg,
+ const qpid::broker::PersistableQueue& queue);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
void destroy(Db& db, const qpid::broker::Persistable& p);