Author: cctrieloff
Date: 2007-09-28 16:56:24 -0400 (Fri, 28 Sep 2007)
New Revision: 953
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/TxnCtxt.h
Log:
- txn support for async IO
- some refactor to reduce line count.
- TODO
- pass xid list to jc recover when it supports it
- read msg list back from jc for indoubt txns and populate.
- move the xid jc functions.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-28 16:08:00 UTC (rev 952)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-28 20:56:24 UTC (rev 953)
@@ -30,7 +30,6 @@
#include "BindingDbt.h"
#include "IdPairDbt.h"
#include "StringDbt.h"
-#include <jrnl/jcntl.hpp>
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -44,7 +43,9 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
+unsigned int TxnCtxt::count = 0;
+
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
exchangeDb(&env, 0),
@@ -111,13 +112,11 @@
}
txn->commit(0);
-
- if (usingJrnl())
- {
- try{
+ if (usingJrnl()) {
+ try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
}
- catch ( journal::jexception& e) {
+ catch ( journal::jexception& e) {
std::string str;
THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str)
);
}
@@ -129,31 +128,24 @@
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
-
-
- if (usingJrnl())
- {
+ if (usingJrnl()) {
journal::jcntl* jQueue = new journal::jcntl(queue.getName(), getJrnlDir(queue),
string("JournalData"));
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-
- try
- {
- // init will create the deque's for the init...
- jQueue->initialize();
-
- } catch (journal::jexception& e) {
- std::string s;
+ try {
+ // init will create the deque's for the init...
+ jQueue->initialize();
+ } catch (journal::jexception& e) {
+ std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queue.getName());
}
+ }
- }
-
try {
if (!create(queueDb, queueIdSequence, queue)) {
- THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
+ THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
} catch (DbException& e) {
- THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(),
e);
+ THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(),
e);
}
}
@@ -412,7 +404,7 @@
try {
while (read) {
-std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
+//std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
rhm::journal::iores res = jc->read_data_record(&buff, buffSize,
&dtokp);
@@ -422,23 +414,24 @@
switch (res)
{
case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- char* data = buff;
- unsigned headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want
read size or header size ????
+ msg_count++;
+ char* data = buff;
+ unsigned headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want
read size or header size ????
- RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
- msg->setPersistenceId(dtokp.rid());
+ RecoverableMessage::shared_ptr msg = recovery.recoverMessage(headerBuff);
+ msg->setPersistenceId(dtokp.rid());
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
-
- if (msg->loadContent(contentSize)) {
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize)) {
//now read the content
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
- }
- if (PreparedTransaction::isLocked(locked,
queue->getPersistenceId(), dtokp.rid())) {
+ }
+ // TODO - change to prep list based on reading state from journal
+ // -- add to prepared.enqued list..
+ if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(),
dtokp.rid())) {
prepared[dtokp.rid()] = msg;
} else {
queue->recover(msg);
@@ -448,9 +441,9 @@
maxMessageId = dtokp.rid();
}
- dtokp.reset();
- dtokp.set_wstate(rhm::journal::data_tok::ENQ);
- break;
+ dtokp.reset();
+ dtokp.set_wstate(rhm::journal::data_tok::ENQ);
+ break;
}
case rhm::journal::RHM_IORES_AIO_WAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
@@ -552,22 +545,25 @@
void BdbMessageStore::recoverXids(txn_list& txns)
{
std::set<string> prepared;
- std::set<string> known;
+ collectPreparedXids(prepared);
- readXids(prepareXidDb, prepared);
- readXids(enqueueXidDb, known);
- readXids(dequeueXidDb, known);
+ if (!usingJrnl()){
+ std::set<string> known;
+ readXids(enqueueXidDb, known);
+ readXids(dequeueXidDb, known);
- //abort all known but unprepared xids:
- for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
- if (prepared.find(*i) == prepared.end()) {
- TPCTxnCtxt txn(*i);
- completed(txn, dequeueXidDb, enqueueXidDb);
- }
- }
-
+ //abort all known but unprepared xids:
+ for (std::set<string>::iterator i = known.begin(); i != known.end(); i++) {
+ if (prepared.find(*i) == prepared.end()) {
+ TPCTxnCtxt txn(*i);
+ completed(txn, dequeueXidDb, enqueueXidDb);
+ }
+ }
+ }
txn_lock_map enqueues;
txn_lock_map dequeues;
+
+ //this will be empty when using the journal -- TODO to get from journal.
readLockedMappings(enqueueXidDb, enqueues);
readLockedMappings(dequeueXidDb, dequeues);
@@ -625,7 +621,7 @@
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
- store(NULL, txn.get(), key, msg, true);
+ store(NULL, &txn, key, msg, true);
msg.setPersistenceId(messageId);
txn.commit();
} catch (std::exception& e) {
@@ -748,20 +744,23 @@
if (messageId == 0) {
messageId = messageIdSequence.next();
msg.setPersistenceId(messageId);
- newId = true;
- }
- store(&queue, txn->get(), key, msg, newId);
+ newId = true;
+ }
+ store(&queue, txn, key, msg, newId);
- if (!usingJrnl()){
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue);
+ }else{
msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
- }
- // cct if using Journal do we need to wait for IO to complete before calling thus???
- // set enqueue comple on callback msg.enqueueComplete();
- if (txn->isTPC()) {
- record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn),
messageId, queueId);
- }
+ // cct if using Journal do we need to wait for IO to complete before calling thus???
+ // set enqueue comple on callback msg.enqueueComplete();
+ if (txn->isTPC()) {
+ record2pcOp(enqueueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn), messageId,
queueId);
+ }
+ }
if (!ctxt) txn->commit();
} catch (std::exception& e) {
@@ -771,7 +770,7 @@
}
void BdbMessageStore::store(const PersistableQueue* queue,
- DbTxn* txn, Dbt& messageId,
+ TxnCtxt* txn, Dbt& messageId,
PersistableMessage& message,
bool newId)
{
@@ -789,7 +788,7 @@
if ( queue && usingJrnl()){
dtokp = new journal::data_tok;
- // cct TODO -- delete this in the callback...
+ // deleted this in the callback...
dtokp->setSourceMessage (&message);
dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal
header (record-id)
@@ -798,7 +797,7 @@
while (!written)
{
journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data_record(buff, size,
dtokp);
+ rhm::journal::iores eres = jc->enqueue_data_record(buff, size, dtokp/*,
txn->getXid(), false*/);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -828,7 +827,7 @@
/// cct message db
if (newId){ // only store in Bd if first time message is stored
Dbt data(buff,size);
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
}
}
}catch ( journal::jexception& e) {
@@ -862,22 +861,23 @@
}
try {
- if (txn->isTPC()) {
+
+ if (usingJrnl()){
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue);
+ async_dequeue(ctxt, msg, queue);
+
+ } else if (txn->isTPC()) {
//if this is part of a 2pc transaction, then only record the dequeue now,
//it will be applied on commit
record2pcOp(dequeueXidDb, dynamic_cast<TPCTxnCtxt&>(*txn),
messageId, queueId);
} else {
Dbt key (&messageId, sizeof(messageId));
Dbt value (&queueId, sizeof(queueId));
-
- if (usingJrnl()){
- async_dequeue(ctxt, msg, queue);
- }else{
- if (dequeue(txn->get(), key, value)) {
- msg.setPersistenceId(0);//clear id as we have now removed the
message from the store
- msg.dequeueComplete(); // set dequeued for ack
- }
- }
+ if (dequeue(txn->get(), key, value)) {
+ msg.setPersistenceId(0);//clear id as we have now removed the message
from the store
+ msg.dequeueComplete(); // set dequeued for ack
+ }
}
if (!ctxt) txn->commit();
@@ -890,7 +890,7 @@
}
}
-void BdbMessageStore::async_dequeue(TransactionContext* /*ctxt*/, PersistableMessage&
msg, const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt, PersistableMessage&
msg, const PersistableQueue& queue)
{
unsigned aio_sleep_cnt = 0;
bool written = false;
@@ -900,11 +900,17 @@
ddtokp->set_dequeue_rid(messageIdSequence.next());
ddtokp->set_wstate(journal::data_tok::ENQ);
journal::jcntl* jc = static_cast<journal::jcntl*>(queue.getExternalQueueStore());
+ string tid;
+ if (ctxt){
+ TxnCtxt* txn = check(ctxt);
+ tid = txn->getXid();
+ }
+
while (!written)
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_data_record(ddtokp);
+ dres = jc->dequeue_data_record(ddtokp, tid);
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;
@@ -982,23 +988,24 @@
try {
- //scroll through all records matching xid in apply and dequeue
- //using the message and queue id encoded in each value
- Cursor c;
- c.open(apply, txn.get());
-
StringDbt key(txn.getXid());
- IdPairDbt value;
+ if (!usingJrnl()){
+ //scroll through all records matching xid in apply and dequeue
+ //using the message and queue id encoded in each value
+ Cursor c;
+ c.open(apply, txn.get());
+ IdPairDbt value;
- for (int status = c->get(&key, &value, DB_SET); status == 0; status =
c->get(&key, &value, DB_NEXT_DUP)) {
- dequeue(txn.get(), value.message, value.queue);
- }
- c.close();
+ for (int status = c->get(&key, &value, DB_SET); status == 0; status =
c->get(&key, &value, DB_NEXT_DUP)) {
+ dequeue(txn.get(), value.message, value.queue);
+ }
+ c.close();
- //delete all records matching xid
+ //delete all records matching xid
+ discard.del(txn.get(), &key, 0);
+ apply.del(txn.get(), &key, 0);
+ }
prepareXidDb.del(txn.get(), &key, 0);
- discard.del(txn.get(), &key, 0);
- apply.del(txn.get(), &key, 0);
txn.commit();
} catch (std::exception& e) {
@@ -1033,6 +1040,8 @@
Dbt key ((void*) xid.data(), xid.length());
Dbt value(&dummy, sizeof(dummy));
+ // make sure all the data is written to disk before returning
+ txn->sync();
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-28 16:08:00 UTC (rev 952)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-28 20:56:24 UTC (rev 953)
@@ -39,15 +39,12 @@
#include <iostream>
#include <boost/format.hpp>
#include <boost/ptr_container/ptr_list.hpp>
+#include <jrnl/jcntl.hpp>
namespace rhm {
namespace bdbstore {
using std::string;
- #define MAX_AIO_SLEEPS 300
- #define AIO_SLEEP_TIME 1000
-
-
/**
* An implementation of the MessageStore interface based on Berkeley DB
*/
@@ -89,7 +86,7 @@
void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn,
+ void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
Dbt& messageId,
qpid::broker::PersistableMessage& message,
bool newId);
@@ -111,13 +108,14 @@
void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
- // journal functions
- void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
- string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
- string getJrnlDir(const char* queueName);
- inline bool usingJrnl() {return false;} // make configurable
- string getJrnlBaseDir();
+ // journal functions
+ void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
+ string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
+ string getJrnlDir(const char* queueName);
+ static inline bool usingJrnl() {return false;} // make configurable
+ string getJrnlBaseDir();
+
public:
BdbMessageStore(const char* envpath = 0);
~BdbMessageStore();
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-09-28 16:08:00 UTC (rev 952)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-09-28 20:56:24 UTC (rev 953)
@@ -26,21 +26,101 @@
#include "db-inc.h"
#include <qpid/broker/MessageStore.h>
+#include <qpid/sys/Mutex.h>
+#include <boost/shared_ptr.hpp>
+#include <vector>
+#include <jrnl/jcntl.hpp>
+#include <boost/format.hpp>
namespace rhm{
namespace bdbstore{
+// find a better place to put these
+#define MAX_AIO_SLEEPS 1000
+#define AIO_SLEEP_TIME 1000
+
+
class TxnCtxt : public qpid::broker::TransactionContext
{
- DbTxn* txn;
+private:
+ typedef std::set<const qpid::broker::PersistableQueue*> ipqdef;
+ ipqdef impactedQueues; // list of Queues used in the txn
+ static unsigned int count;
+ mutable qpid::sys::Mutex Lock;
+
+ unsigned int getCount() {
+ qpid::sys::Mutex::ScopedLock locker(Lock);
+ return ++count;
+ }
+ /**
+ * local txn id, if non XA.
+ */
+ std::string tid;
+ DbTxn* txn;
+
+ void completeTXN(bool commit){
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
+ journal::jcntl* jc =
static_cast<journal::jcntl*>((*i)->getExternalQueueStore());
+ if (jc) /* if using journal */
+ if (commit)
+ jc->commit_dtx(getXid());
+ else
+ jc->abort_dtx(getXid());
+ }
+ deleteXidRecord();
+ sync();
+ }
+
public:
- TxnCtxt() : txn(0) {}
+
+ TxnCtxt() : txn(0) {
+ tid = "rhm-tid" + getCount();
+ }
+
+ /**
+ * Call to make sure all the data for this txn is written to safe store
+ *
+ *@return if the data sucessfully synced.
+ */
+ void sync(){
+ bool allWritten = true;
+ bool firstloop = true;
+ unsigned aio_sleep_cnt = 0;
+ while (!allWritten){
+ if (!firstloop) ::usleep(AIO_SLEEP_TIME);
+ allWritten = true;
+ unsigned qcnt = 0;
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
+ qcnt ++;
+ journal::jcntl* jc =
static_cast<journal::jcntl*>((*i)->getExternalQueueStore());
+ if (jc && !(jc->is_dtx_synced(getXid())))
+ {
+ if (firstloop)
+ jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ }
+ firstloop = false;
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS*qcnt)
+ {
+ THROW_STORE_EXCEPTION("Store error, disk time out on sync for:"
+ getXid());
+ }
+ }
+ }
+
virtual ~TxnCtxt() { if(txn) abort(); }
void begin(DbEnv& env){ env.txn_begin(0, &txn, 0); }
- void commit(){ txn->commit(0); txn = 0; }
- void abort(){ txn->abort(); txn = 0; }
+ void commit(){ txn->commit(0); completeTXN(true); txn = 0; sync();}
+ void abort(){ txn->abort(); completeTXN(false); txn = 0; sync();}
DbTxn* get(){ return txn; }
virtual bool isTPC() { return false; }
+ virtual const std::string& getXid() { return tid; }
+
+ void deleteXidRecord(){ impactedQueues.clear(); }
+ void addXidRecord(const qpid::broker::PersistableQueue& queue){
+ impactedQueues.insert(&queue); }
+
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -49,7 +129,7 @@
public:
TPCTxnCtxt(const std::string& _xid) : xid(_xid) {}
virtual bool isTPC() { return true; }
- const std::string& getXid() { return xid; }
+ virtual const std::string& getXid() { return xid; }
};
}}