[rhmessaging-commits] rhmessaging commits: r2198 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jul 14 13:05:32 EDT 2008
Author: kpvdr
Date: 2008-07-14 13:05:31 -0400 (Mon, 14 Jul 2008)
New Revision: 2198
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
store/branches/mrg-1.0/cpp/lib/JournalImpl.h
store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Backport of trunk r.2187-2188: Fixed 2PC multi-queue transaction atomicity problem. Any multi-queue 2PC commit/abort which is interrupted by failure will now be completed on recover for all queues which did not get processed. Local txns still have this problem, however. Some code tidy-up is also included. Removed old error messages from StorePlugin which are no longer possible owing to the removal of sync journal. 2188: orrected signed/unsigned comparison (which does not show up on F9)
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -26,6 +26,7 @@
#include "BindingDbt.h"
#include "BufferValue.h"
#include "IdPairDbt.h"
+#include "jrnl/txn_map.hpp"
#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
@@ -49,28 +50,35 @@
qpid::sys::Duration BdbMessageStore::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
-BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
- queueDb(&env, 0),
- configDb(&env, 0),
- exchangeDb(&env, 0),
- messageDb(&env, 0),
- mappingDb(&env, 0),
- bindingDb(&env, 0),
- generalDb(&env, 0),
- numJrnlFiles(defNumJrnlFiles),
- jrnlFsizePgs(defJrnlFileSizePgs),
- wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
- wcache_num_pages(JRNL_WMGR_DEF_PAGES),
- highestRid(0),
- isInit(false),
- envPath(envpath)
+BdbMessageStore::PreparedRecoverStruct::PreparedRecoverStruct(const u_int64_t _rid,
+ const bool _deq_flag,
+ const bool _commit_flag) :
+ rid(_rid),
+ deq_flag(_deq_flag),
+ commit_flag(_commit_flag)
+{}
+BdbMessageStore::BdbMessageStore(const char* envpath) :
+ env(0),
+ queueDb(&env, 0),
+ configDb(&env, 0),
+ exchangeDb(&env, 0),
+ messageDb(&env, 0),
+ mappingDb(&env, 0),
+ bindingDb(&env, 0),
+ generalDb(&env, 0),
+ numJrnlFiles(defNumJrnlFiles),
+ jrnlFsizePgs(defJrnlFileSizePgs),
+ wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
+ wcache_num_pages(JRNL_WMGR_DEF_PAGES),
+ highestRid(0),
+ isInit(false),
+ envPath(envpath)
{}
-
+
void BdbMessageStore::initManagement (Broker* broker)
{
- if (broker != 0)
- {
+ if (broker != 0) {
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
@@ -87,13 +95,16 @@
}
}
-bool BdbMessageStore::init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize)
-{
+bool BdbMessageStore::init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize)
+{
if (isInit) return true;
numJrnlFiles = jfiles;
jrnlFsizePgs = jfileSizePgs;
-
+
// set wcache_pgsize_sblks and wcache_num_pages from wCachePageSize
wcache_pgsize_sblks = wCachePageSize * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
@@ -114,7 +125,7 @@
// 1 MiB total cache
wcache_num_pages = defTotWCacheSize / wcache_pgsize_sblks;
}
-
+
if (dir.size()>0) storeDir = dir;
journal::jdir::create_dir(getBdbBaseDir());
@@ -132,12 +143,11 @@
TxnCtxt txn;
try {
-
txn.begin(env, false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
- open(messageDb, txn.get(), "messages.db", false);
+ open(messageDb, txn.get(), "messages.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
@@ -153,7 +163,7 @@
txn.abort();
throw;
}
-
+
isInit = true;
QPID_LOG(info, "BdbMessageStore module initialized: dir=" << dir << "; jfiles=" << jfiles << "; jfileSizePgs=" << jfileSizePgs << "; wCachePageSize=" << wCachePageSize);
return true;
@@ -161,25 +171,21 @@
void BdbMessageStore::chkInitPreparedXidStore()
{
- if (!preparedXidStorePtr->is_init())
- {
+ if (!preparedXidStorePtr->is_ready()) {
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks, defXidStoreWCachePageSize);
}
}
-bool BdbMessageStore::init(const qpid::Options* options)
+bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
-
+
u_int16_t numJrnlFiles = opts->numJrnlFiles;
- if (numJrnlFiles < JRNL_MIN_NUM_FILES)
- {
+ if (numJrnlFiles < JRNL_MIN_NUM_FILES) {
numJrnlFiles = JRNL_MIN_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value.");
- }
- else if (numJrnlFiles > JRNL_MAX_NUM_FILES)
- {
+ } else if (numJrnlFiles > JRNL_MAX_NUM_FILES) {
numJrnlFiles = JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value.");
}
@@ -187,17 +193,14 @@
u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
u_int32_t jrnlMaxFsizePgs = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
- if (jrnlFsizePgs < jrnlMinFsizePgs)
- {
+ if (jrnlFsizePgs < jrnlMinFsizePgs) {
jrnlFsizePgs = jrnlMinFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value.");
- }
- else if (jrnlFsizePgs > jrnlMaxFsizePgs)
- {
+ } else if (jrnlFsizePgs > jrnlMaxFsizePgs) {
jrnlFsizePgs = jrnlMaxFsizePgs;
QPID_LOG(warning, "parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value.");
}
-
+
u_int32_t jrnlWrCachePageSize = opts->wCachePageSize;
switch (jrnlWrCachePageSize)
{
@@ -212,14 +215,11 @@
break;
default:
u_int32_t oldJrnlWrCachePageSize = jrnlWrCachePageSize;
- if (oldJrnlWrCachePageSize == 0)
- {
+ if (oldJrnlWrCachePageSize == 0) {
// For zero value, use default
jrnlWrCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter wcache-page-size (" << oldJrnlWrCachePageSize << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << jrnlWrCachePageSize << ")");
- }
- else
- {
+ } else {
// For any positive value, use closest value
if (oldJrnlWrCachePageSize < 6)
jrnlWrCachePageSize = 4;
@@ -240,17 +240,19 @@
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, jrnlWrCachePageSize);
}
-void BdbMessageStore::open(Db& db, DbTxn* txn, const char* file, bool dupKey)
+void BdbMessageStore::open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey)
{
if(dupKey) db.set_flags(DB_DUPSORT);
db.open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
dbs.push_back(&db);
}
-BdbMessageStore::~BdbMessageStore()
+BdbMessageStore::~BdbMessageStore()
{
try {
-
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
@@ -279,17 +281,18 @@
(*i)->truncate(txn, &count, 0);
}
- txn->commit(0);
- try{
+ txn->commit(0);
+ try {
journal::jdir::delete_dir(getJrnlBaseDir(),true);
journal::jdir::delete_dir(getPxidBaseDir(),true);
- }
+ }
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
- }
+ }
}
-void BdbMessageStore::create(PersistableQueue& queue, const FieldTable& args)
+void BdbMessageStore::create(PersistableQueue& queue,
+ const FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
@@ -315,15 +318,15 @@
string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout);
}
+
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
- try {
+ try {
// init will create the deque's for the init...
jQueue->initialize(localFileCount, localFileSize * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": create() failed: " + e.what());
}
-
try {
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
@@ -339,21 +342,20 @@
destroy(queueDb, queue);
deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
- if (eqs)
- {
+ if (eqs) {
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
- jQueue->delete_jrnl_files();
+ jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
}
}
-void BdbMessageStore::create(const PersistableExchange& exchange, const FieldTable& /*args*/)
+void BdbMessageStore::create(const PersistableExchange& exchange,
+ const FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
-
try {
if (!create(exchangeDb, exchangeIdSequence, exchange)) {
THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
@@ -378,7 +380,6 @@
if (general.getPersistenceId()) {
THROW_STORE_EXCEPTION("General configuration item already created");
}
-
try {
if (!create(generalDb, generalIdSequence, general)) {
THROW_STORE_EXCEPTION("General configuration already exists");
@@ -394,7 +395,9 @@
destroy(generalDb, general);
}
-bool BdbMessageStore::create(Db& db, IdSequence& seq, const Persistable& p)
+bool BdbMessageStore::create(Db& db,
+ IdSequence& seq,
+ const Persistable& p)
{
u_int64_t id (seq.next());
Dbt key(&id, sizeof(id));
@@ -416,11 +419,13 @@
}
-void BdbMessageStore::bind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable& a)
+void BdbMessageStore::bind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable& a)
{
checkInit();
- IdDbt key(e.getPersistenceId());
+ IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(env, true);
@@ -428,8 +433,10 @@
txn.commit();
}
-void BdbMessageStore::unbind(const PersistableExchange& e, const PersistableQueue& q,
- const std::string& k, const FieldTable&)
+void BdbMessageStore::unbind(const PersistableExchange& e,
+ const PersistableQueue& q,
+ const std::string& k,
+ const FieldTable&)
{
checkInit();
deleteBinding(e, q, k);
@@ -450,7 +457,7 @@
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
-
+
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
@@ -466,35 +473,45 @@
}
//recover transactions:
- for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-
+ for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
+ std::auto_ptr<TPCTransactionContext> txn(tpcc);
+ tpcc->prepare(preparedXidStorePtr.get());
+
// Restore data token state in TxnCtxt
- xid_rid_map_citr citr = preparedMap.find(i->xid);
- if (citr == preparedMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedMap");
- tpcc->recoverDtok(citr->second, i->xid);
- tpcc->addXidRecord(preparedXidStorePtr.get());
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(i->xid);
+ if (citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ tpcc->recoverDtok(citr->second.rid, i->xid);
- RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, std::auto_ptr<TPCTransactionContext>(tpcc));
+ // If a record is found that is dequeued but not committed/aborted from preparedXidStore, then a complete() call
+ // was interrupted part way through committing/aborting the impacted queues. Complete this process.
+ bool incomplTxnFlag = citr->second.deq_flag;
+
+ RecoverableTransaction::shared_ptr dtx = registry.recoverTransaction(i->xid, txn);
if (i->enqueues.get()) {
for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->enqueue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (i->dequeues.get()) {
for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
- dtx->dequeue(queues[j->first], messages[j->second]);
+ if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
+
+ if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
}
registry.recoveryComplete();
}
-void BdbMessageStore::recoverQueues(TxnCtxt& txn, RecoveryManager& registry, queue_index& queue_index, txn_list&
- prepared, message_index& messages)
+void BdbMessageStore::recoverQueues(TxnCtxt& txn,
+ RecoveryManager& registry,
+ queue_index& queue_index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
@@ -510,7 +527,7 @@
RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
-
+
const char* queueName = queue->getName().c_str();
JournalImpl* jQueue = 0;
{
@@ -518,14 +535,14 @@
jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
-
+
try
{
u_int64_t thisHighestRid = 0;
jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
- recoverMessages(txn, registry, queue, prepared, messages);
+ recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
@@ -544,7 +561,9 @@
}
-void BdbMessageStore::recoverExchanges(TxnCtxt& txn, RecoveryManager& registry, exchange_index& index)
+void BdbMessageStore::recoverExchanges(TxnCtxt& txn,
+ RecoveryManager& registry,
+ exchange_index& index)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
Cursor exchanges;
@@ -566,13 +585,15 @@
exchangeIdSequence.reset(maxExchangeId + 1);
}
-void BdbMessageStore::recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues)
+void BdbMessageStore::recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues)
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -582,7 +603,7 @@
uint64_t queueId = buffer.getLongLong();
string queueName;
string routingkey;
- FieldTable args;
+ FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
@@ -599,7 +620,8 @@
}
}
-void BdbMessageStore::recoverGeneral(TxnCtxt& txn, RecoveryManager& registry)
+void BdbMessageStore::recoverGeneral(TxnCtxt& txn,
+ RecoveryManager& registry)
{
Cursor items;
items.open(generalDb, txn.get());
@@ -620,16 +642,18 @@
}
-void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue, txn_list& locked, message_index& prepared)
+void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& prepared,
+ message_index& messages)
{
+ size_t preambleLength = sizeof(u_int32_t)/*header size*/;
- size_t preambleLength = sizeof(u_int32_t)/*header size*/;
-
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
size_t readSize = 0;
- unsigned msg_count=0;
+ unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
@@ -642,58 +666,63 @@
bool transientFlag = false;
bool externalFlag = false;
+ dtokp.set_wstate(DataTokenImpl::ENQ);
- dtokp.set_wstate(DataTokenImpl::ENQ);
- // read the message from the Journal.
+ // Read the message from the Journal.
try {
-
- //std::cout << jc->dirname() <<"-queueName:" << queue->getName() << "-enq count:" << jc->get_enq_cnt() << std::endl;
-
unsigned aio_sleep_cnt = 0;
while (read) {
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
readSize = dtokp.dsize();
-
+
switch (res)
{
- case rhm::journal::RHM_IORES_SUCCESS:{
- msg_count++;
- RecoverableMessage::shared_ptr msg;
- char* data = (char*)dbuff;
-
- unsigned headerSize;
- if (externalFlag){
- msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
- } else {
- headerSize = Buffer(data, preambleLength).getLong();
- Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
- msg = recovery.recoverMessage(headerBuff);
- }
- msg->setPersistenceId(dtokp.rid());
-
- u_int32_t contentOffset = headerSize + preambleLength;
- u_int64_t contentSize = readSize - contentOffset;
- if (msg->loadContent(contentSize) && !externalFlag) {
- //now read the content
- Buffer contentBuff(data + contentOffset, contentSize);
- msg->decodeContent(contentBuff);
- }
+ case rhm::journal::RHM_IORES_SUCCESS: {
+ msg_count++;
+ RecoverableMessage::shared_ptr msg;
+ char* data = (char*)dbuff;
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
- prepared[dtokp.rid()] = msg;
- } else {
- queue->recover(msg);
- }
-
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
-
- if (xidbuff)
- ::free(xidbuff);
- else if (dbuff)
- ::free(dbuff);
- aio_sleep_cnt = 0;
- break;
+ unsigned headerSize;
+ if (externalFlag) {
+ msg = getExternMessage(recovery, dtokp.rid(), headerSize); // large message external to jrnl
+ } else {
+ headerSize = Buffer(data, preambleLength).getLong();
+ Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
+ msg = recovery.recoverMessage(headerBuff);
+ }
+ msg->setPersistenceId(dtokp.rid());
+
+ u_int32_t contentOffset = headerSize + preambleLength;
+ u_int64_t contentSize = readSize - contentOffset;
+ if (msg->loadContent(contentSize) && !externalFlag) {
+ //now read the content
+ Buffer contentBuff(data + contentOffset, contentSize);
+ msg->decodeContent(contentBuff);
+ }
+
+ if (PreparedTransaction::isLocked(prepared, queue->getPersistenceId(), dtokp.rid()) ) {
+ std::string xid((char*)xidbuff, xidbuffSize);
+ PreparedRecoverMapCitr citr = preparedXidStoreRecoverMap.find(xid);
+ if (xid.size() > 0 && citr == preparedXidStoreRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in preparedXidStoreRecoverMap");
+ if (xid.size() > 0 && citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ if (citr->second.commit_flag)
+ queue->recover(msg);
+ } else {
+ messages[dtokp.rid()] = msg;
+ }
+ } else {
+ queue->recover(msg);
+ }
+
+ dtokp.reset();
+ dtokp.set_wstate(DataTokenImpl::ENQ);
+
+ if (xidbuff)
+ ::free(xidbuff);
+ else if (dbuff)
+ ::free(dbuff);
+ aio_sleep_cnt = 0;
+ break;
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
@@ -702,7 +731,7 @@
break;
case rhm::journal::RHM_IORES_EMPTY:
read = false;
- break; // done with all messages. ((add call in jrnl to test that _emap is empty.
+ break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
assert( "Store Error: Unexpected msg state");
} // switch
@@ -713,33 +742,37 @@
}
}
-RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t messageId, unsigned& headerSize)
+RecoverableMessage::shared_ptr BdbMessageStore::getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t messageId,
+ unsigned& headerSize)
{
Dbt key (&messageId, sizeof(messageId));
- size_t preamble_length = sizeof(u_int32_t)/*header size*/;
+ size_t preamble_length = sizeof(u_int32_t); /*header size*/
BufferValue value(preamble_length, 0);
value.buffer.record();
if (messageDb.get(0, &key, &value, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
//read header only to begin with
headerSize = value.buffer.getLong();
BufferValue header(headerSize, preamble_length);
if (messageDb.get(0, &key, &header, 0) == DB_NOTFOUND) {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
return recovery.recoverMessage(header.buffer);
-}
+}
-int BdbMessageStore::enqueueMessage(TxnCtxt& txn, IdDbt& msgId, RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked,
- message_index& prepared)
+int BdbMessageStore::enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& prepared,
+ message_index& messages)
{
Cursor mappings;
mappings.open(mappingDb, txn.get());
@@ -751,10 +784,10 @@
if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
- } else {
+ } else {
RecoverableQueue::shared_ptr queue = index[value.id];
- if (PreparedTransaction::isLocked(locked, value.id, msgId.id)) {
- prepared[msgId.id] = msg;
+ if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
+ messages[msgId.id] = msg;
} else {
queue->recover(msg);
}
@@ -765,22 +798,69 @@
return count;
}
+void BdbMessageStore::recoverPreparedXidJournal()
+{
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf")) {
+ u_int64_t thisHighestRid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
+ 0, thisHighestRid, 0);
+ if (thisHighestRid > highestRid)
+ highestRid = thisHighestRid;
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
+}
+
+void BdbMessageStore::getPreparedXidMap(PreparedRecoverMap& prepXidMap)
+{
+ if (preparedXidStorePtr.get()) {
+ if (!preparedXidStorePtr->is_ready())
+ recoverPreparedXidJournal();
+
+ // TODO: The journal will return a const txn_map and the txn_map will support
+ // const operations at some point. Using non-const txn_map this way is ugly...
+ journal::txn_map& tmap = preparedXidStorePtr->get_txn_map();
+ std::vector<std::string> xidList;
+ tmap.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
+ journal::txn_data_list txnList = tmap.get_tdata_list(*i);
+ unsigned enqCnt = 0;
+ unsigned deqCnt = 0;
+ u_int64_t rid = 0;
+ bool commitFlag = false;
+ for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ if (j->_enq_flag) {
+ rid = j->_rid;
+ enqCnt++;
+ } else {
+ commitFlag = j->_commit_flag;
+ deqCnt++;
+ }
+ }
+ assert(enqCnt == 1);
+ assert(deqCnt <= 1);
+ prepXidMap.insert(PreparedRecoverMapPair(*i, PreparedRecoverStruct(rid, deqCnt == 1, commitFlag)));
+ }
+ }
+}
+
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> preparedXidSet;
- collectPreparedXids(preparedXidSet);
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
// Abort unprepaired xids and populate the locked maps
- for (std::set<string>::iterator i = preparedXidSet.begin(); i != preparedXidSet.end(); i++) {
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
- txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
}
-void BdbMessageStore::readLockedMappings(Db& db, txn_lock_map& mappings)
+void BdbMessageStore::readLockedMappings(Db& db,
+ txn_lock_map& mappings)
{
Cursor c;
c.open(db, 0);
@@ -795,54 +875,10 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
- {
- u_int64_t thisHighestRid;
- preparedXidStorePtr->recover(defXidStoreNumJrnlFiles, defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
- JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks, defXidStoreWCachePageSize,
- 0, thisHighestRid, 0);
- if (thisHighestRid > highestRid)
- highestRid = thisHighestRid;
- try {
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
- bool transientFlag = false;
- bool externalFlag = false;
- DataTokenImpl dtokp;
- bool done = false;
- long aio_sleep_cnt = 0;
- while (!done) {
- dtokp.reset();
- dtokp.set_wstate(DataTokenImpl::ENQ);
- rhm::journal::iores res = preparedXidStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp);
- switch (res) {
- case rhm::journal::RHM_IORES_SUCCESS:
- if (xidbuffSize > 0) {
- xids.insert(std::string((const char*)xidbuff, xidbuffSize));
- preparedMap[std::string((const char*)xidbuff, xidbuffSize)] = dtokp.rid();
- ::free(xidbuff);
- } else {
- THROW_STORE_EXCEPTION("No XID found in BdbMessageStore::collectPreparedXids()");
- }
- aio_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::collectPreparedXids()");
- ::usleep(AIO_SLEEP_TIME);
- break;
- case rhm::journal::RHM_IORES_EMPTY:
- done = true;
- break;
- default:
- assert( "Store Error: Unexpected msg state");
- }
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Prepared XID journal: collectPreparedXids() failed: ") + e.what());
- }
-
- preparedXidStorePtr->recover_complete(); // start journal.
+ if (!preparedXidStorePtr->is_ready())
+ getPreparedXidMap(preparedXidStoreRecoverMap);
+ for (PreparedRecoverMapCitr i = preparedXidStoreRecoverMap.begin(); i != preparedXidStoreRecoverMap.end(); i++) {
+ xids.insert(i->first);
}
}
@@ -864,8 +900,9 @@
txn.abort();
throw;
}
- }
+ }
}
+
void BdbMessageStore::destroy(PersistableMessage& msg)
{
checkInit();
@@ -880,17 +917,19 @@
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
- }
+ }
}
}
-
-u_int64_t BdbMessageStore::getRecordSize(Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(Db& db,
+ Dbt& key)
{
return getRecordSize(0, db, key);
}
-u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn, Db& db, Dbt& key)
+u_int64_t BdbMessageStore::getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key)
{
Dbt peek;
peek.set_flags(DB_DBT_USERMEM);
@@ -906,7 +945,8 @@
return peek.get_size();
}
-void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data)
+void BdbMessageStore::appendContent(intrusive_ptr<const PersistableMessage>& msg,
+ const std::string& data)
{
checkInit();
u_int64_t messageId (msg->getPersistenceId());
@@ -932,28 +972,31 @@
}
} else {
THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
- }
+ }
}
void BdbMessageStore::loadContent(const qpid::broker::PersistableQueue& queue,
- intrusive_ptr<const PersistableMessage>& msg, std::string& data, u_int64_t offset, u_int32_t length)
+ intrusive_ptr<const PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length)
{
checkInit();
- u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/+ msg->encodedHeaderSize();
+ u_int64_t realOffset = offset + sizeof(u_int32_t)/*header length*/ + msg->encodedHeaderSize();
u_int64_t messageId (msg->getPersistenceId());
-
+
if (messageId != 0) {
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc && jc->is_enqueued(messageId) ){
- if (jc->loadMsgContent(messageId, data, realOffset, length)){
+ if (jc && jc->is_enqueued(messageId) ) {
+ if (jc->loadMsgContent(messageId, data, realOffset, length)) {
return;
}
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": loadContent() failed: " + e.what());
- }
+ }
TxnCtxt txn;
txn.begin(env, true);
try {
@@ -964,11 +1007,11 @@
value.set_ulen(length);
value.set_doff(realOffset);
value.set_dlen(length);
- int status = messageDb.get(txn.get(), &key, &value, 0);
+ int status = messageDb.get(txn.get(), &key, &value, 0);
if (status == DB_NOTFOUND) {
txn.abort();
delete [] buffer;
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
+ THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
} else {
txn.commit();
data.assign(buffer, value.get_size());
@@ -989,11 +1032,11 @@
std::string qn = queue.getName();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc){
+ if (jc) {
// TODO: check if this result should be used...
/*rhm::journal::iores res =*/ jc->flush();
}
- }catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
@@ -1029,14 +1072,14 @@
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
-void BdbMessageStore::store(const PersistableQueue* queue,
- TxnCtxt* txn, Dbt& messageId,
- intrusive_ptr<PersistableMessage>& message,
+void BdbMessageStore::store(const PersistableQueue* queue,
+ TxnCtxt* txn, Dbt& messageId,
+ intrusive_ptr<PersistableMessage>& message,
bool newId)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
- char* buff= 0;
+ char* buff = 0;
if (!message->isContentReleased() )
{
buff = static_cast<char*>(::alloca(size)); // long + headers + content
@@ -1046,23 +1089,22 @@
}
try {
-
- if ( queue ) {
+ if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
+
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
+ if (txn->getXid().empty()) {
+ if (message->isContentReleased()) {
jc->enqueue_extern_data_record(size, dtokp.get(), false);
} else {
jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
- }else {
- if (message->isContentReleased()){
+ } else {
+ if (message->isContentReleased()) {
jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
} else {
jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
@@ -1083,7 +1125,7 @@
void BdbMessageStore::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
-{
+{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
@@ -1101,28 +1143,27 @@
} else {
txn = &implicit;
}
-
+
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
-
+ async_dequeue(ctxt, msg, queue);
+
msg->dequeueComplete();
}
-void BdbMessageStore::async_dequeue(
- TransactionContext* ctxt,
- intrusive_ptr<PersistableMessage>& msg,
- const PersistableQueue& queue)
+void BdbMessageStore::async_dequeue(TransactionContext* ctxt,
+ intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
- ddtokp->set_rid(messageIdSequence.next());
+ ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
string tid;
- if (ctxt){
+ if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
@@ -1133,7 +1174,7 @@
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
}
@@ -1144,15 +1185,17 @@
return 0;
}
-bool BdbMessageStore::deleteIfUnused(DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(DbTxn* txn,
+ Dbt& messageId)
{
Cursor cursor;
cursor.open(mappingDb, txn);
-
return deleteIfUnused(cursor, txn, messageId);
}
-bool BdbMessageStore::deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId)
+bool BdbMessageStore::deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId)
{
if (isUnused(cursor, messageId)) {
messageDb.del(txn, &messageId, 0);
@@ -1175,7 +1218,8 @@
}
}
-void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
+void BdbMessageStore::completed(TPCTxnCtxt& txn,
+ bool commit)
{
try {
// Nothing to do if not prepared
@@ -1185,7 +1229,7 @@
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
- preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid());
+ preparedXidStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
} catch (const std::exception& e) {
@@ -1194,7 +1238,7 @@
}
}
-auto_ptr<TransactionContext> BdbMessageStore::begin()
+auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
// pass sequence number for c/a
@@ -1214,7 +1258,7 @@
checkInit();
TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
if(!txn) throw InvalidTransactionContextException();
-
+
try {
chkInitPreparedXidStore();
txn->incrDtokRef();
@@ -1222,8 +1266,7 @@
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
- txn->addXidRecord(preparedXidStorePtr.get());
-
+ txn->prepare(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
} catch (const std::exception& e) {
@@ -1232,18 +1275,18 @@
}
}
-void BdbMessageStore::commit(TransactionContext& ctxt)
+void BdbMessageStore::commit(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (txn->isTPC()) {
- completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
+ completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
txn->complete(true);
}
}
-void BdbMessageStore::abort(TransactionContext& ctxt)
+void BdbMessageStore::abort(TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
@@ -1261,7 +1304,10 @@
return txn;
}
-void BdbMessageStore::put(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+void BdbMessageStore::put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
try {
int status = db.put(txn, &key, &value, DB_NODUPDATA);
@@ -1275,8 +1321,10 @@
}
}
-
-bool BdbMessageStore::deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value)
+bool BdbMessageStore::deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value)
{
Cursor cursor;
cursor.open(db, txn);
@@ -1298,9 +1346,9 @@
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key;
- Dbt value;
+ Dbt value;
while (bindings.next(key, value)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
@@ -1319,16 +1367,18 @@
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
-void BdbMessageStore::deleteBinding(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& bkey)
+void BdbMessageStore::deleteBinding(const PersistableExchange& exchange,
+ const PersistableQueue& queue,
+ const std::string& bkey)
{
TxnCtxt txn;
txn.begin(env, true);
try {
Cursor bindings;
bindings.open(bindingDb, txn.get());
-
+
IdDbt key(exchange.getPersistenceId());
- Dbt value;
+ Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
@@ -1353,21 +1403,21 @@
txn.commit();
}
-string BdbMessageStore::getJrnlBaseDir()
+string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
-string BdbMessageStore::getBdbBaseDir()
+string BdbMessageStore::getBdbBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/dat/" ;
return dir.str();
}
-string BdbMessageStore::getPxidBaseDir()
+string BdbMessageStore::getPxidBaseDir()
{
std::stringstream dir;
dir << storeDir << "/rhm/pxid/" ;
@@ -1386,19 +1436,19 @@
dir << std::setw(4);
dir << std::setfill('0');
u_int32_t count = 0;
- for (u_int32_t i=0; i < strlen(queueName); i++)
- count += queueName[i];
-
- dir << (count%20);
+ for (u_int32_t i = 0; i < strlen(queueName); i++) {
+ count += queueName[i];
+ }
+ dir << (count % 20);
dir << "/" << queueName << "/";
return dir.str();
}
BdbMessageStore::Options::Options(const std::string& name) :
- qpid::Options(name),
- numJrnlFiles(8),
- jrnlFsizePgs(24),
- wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
+ qpid::Options(name),
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
+ wCachePageSize(JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024)
{
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -53,15 +53,25 @@
*/
class BdbMessageStore : public qpid::broker::MessageStore, public qpid::management::Manageable
{
+ protected:
typedef std::map<u_int64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
typedef std::map<u_int64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
-
+
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
- typedef std::map<std::string, u_int64_t> xid_rid_map;
- typedef xid_rid_map::const_iterator xid_rid_map_citr;
+ // Structs for preparedXidStore recover state
+ struct PreparedRecoverStruct {
+ u_int64_t rid;
+ bool deq_flag;
+ bool commit_flag;
+ PreparedRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+ };
+ typedef PreparedRecoverStruct PreparedRecover;
+ typedef std::map<std::string, PreparedRecover> PreparedRecoverMap;
+ typedef PreparedRecoverMap::const_iterator PreparedRecoverMapCitr;
+ typedef std::pair<std::string, PreparedRecoverStruct> PreparedRecoverMapPair;
// Default store settings
static const u_int16_t defNumJrnlFiles = 8;
@@ -80,7 +90,11 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
+
+ // Pointer to prepared XID journal instance
boost::shared_ptr<JournalImpl> preparedXidStorePtr;
+ PreparedRecoverMap preparedXidStoreRecoverMap;
+
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -90,7 +104,6 @@
u_int32_t jrnlFsizePgs;
u_int32_t wcache_pgsize_sblks;
u_int16_t wcache_num_pages;
- xid_rid_map preparedMap;
u_int64_t highestRid;
bool isInit;
const char* envPath;
@@ -99,57 +112,99 @@
qpid::management::Store::shared_ptr mgmtObject;
qpid::sys::Mutex jrnlCreateLock;
- bool mode(const bool mode, const bool force);
- void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& messages);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, queue_index& index,
- txn_list& locked, message_index& prepared);
- void recoverMessages(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery,
- qpid::broker::RecoverableQueue::shared_ptr& queue,
- txn_list& locked, message_index& prepared);
- qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
- uint64_t mId, unsigned& headerSize);
- void recoverExchanges(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery, exchange_index& index);
- void recoverBindings(TxnCtxt& txn, exchange_index& exchanges, queue_index& queues);
- void recoverGeneral(TxnCtxt& txn, qpid::broker::RecoveryManager& recovery);
- int enqueueMessage(TxnCtxt& txn, IdDbt& msgId, qpid::broker::RecoverableMessage::shared_ptr& msg,
- queue_index& index, txn_list& locked, message_index& prepared);
+ void recoverQueues(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& messages);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverMessages(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ qpid::broker::RecoverableQueue::shared_ptr& queue,
+ txn_list& locked,
+ message_index& prepared);
+ qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
+ uint64_t mId, unsigned& headerSize);
+ void recoverExchanges(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery,
+ exchange_index& index);
+ void recoverBindings(TxnCtxt& txn,
+ exchange_index& exchanges,
+ queue_index& queues);
+ void recoverGeneral(TxnCtxt& txn,
+ qpid::broker::RecoveryManager& recovery);
+ int enqueueMessage(TxnCtxt& txn,
+ IdDbt& msgId,
+ qpid::broker::RecoverableMessage::shared_ptr& msg,
+ queue_index& index,
+ txn_list& locked,
+ message_index& prepared);
+ void recoverPreparedXidJournal();
+ void getPreparedXidMap(PreparedRecoverMap& prepXidMap);
void recoverXids(txn_list& txns);
- void readLockedMappings(Db& db, txn_lock_map& mappings);
+ void readLockedMappings(Db& db,
+ txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, TxnCtxt* txn,
- Dbt& messageId,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
- bool newId);
- void async_dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
- bool deleteIfUnused(DbTxn* txn, Dbt& messageId);
- bool isUnused(Cursor& cursor, Dbt& messageId);
- void destroy(Db& db, const qpid::broker::Persistable& p);
- bool create(Db& db, IdSequence& seq, const qpid::broker::Persistable& p);
- void completed(TPCTxnCtxt& txn, bool commit);
- void record2pcOp(Db& db, TPCTxnCtxt& txn, u_int64_t messageId, u_int64_t queueId);
+ void store(const qpid::broker::PersistableQueue* queue,
+ TxnCtxt* txn,
+ Dbt& messageId,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
+ bool newId);
+ void async_dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+ bool deleteIfUnused(Cursor& cursor,
+ DbTxn* txn,
+ Dbt& messageId);
+ bool deleteIfUnused(DbTxn* txn,
+ Dbt& messageId);
+ bool isUnused(Cursor& cursor,
+ Dbt& messageId);
+ void destroy(Db& db,
+ const qpid::broker::Persistable& p);
+ bool create(Db& db,
+ IdSequence& seq,
+ const qpid::broker::Persistable& p);
+ void completed(TPCTxnCtxt& txn,
+ bool commit);
+ void record2pcOp(Db& db,
+ TPCTxnCtxt& txn,
+ u_int64_t messageId,
+ u_int64_t queueId);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
- void deleteBinding(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
+ void deleteBinding(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
const std::string& key);
- u_int64_t getRecordSize(Db& db, Dbt& key);
- u_int64_t getRecordSize(DbTxn* txn, Db& db, Dbt& key);
- void put(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- bool deleteKeyValuePair(Db& db, DbTxn* txn, Dbt& key, Dbt& value);
- void open(Db& db, DbTxn* txn, const char* file, bool dupKey);
-
-
+ u_int64_t getRecordSize(Db& db,
+ Dbt& key);
+ u_int64_t getRecordSize(DbTxn* txn,
+ Db& db,
+ Dbt& key);
+ void put(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ bool deleteKeyValuePair(Db& db,
+ DbTxn* txn,
+ Dbt& key,
+ Dbt& value);
+ void open(Db& db,
+ DbTxn* txn,
+ const char* file,
+ bool dupKey);
+
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
std::string getJrnlDir(const char* queueName);
- std::string getJrnlBaseDir();
- std::string getBdbBaseDir();
- std::string getPxidBaseDir();
+ std::string getJrnlBaseDir();
+ std::string getBdbBaseDir();
+ std::string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs, defWCachePageSize); isInit = true;
}
@@ -183,51 +238,71 @@
typedef boost::shared_ptr<BdbMessageStore> shared_ptr;
BdbMessageStore(const char* envpath = 0);
+
virtual ~BdbMessageStore();
+
bool init(const qpid::Options* options);
- bool init(const std::string& dir, u_int16_t jfiles, u_int32_t jfileSizePgs, uint32_t wCachePageSize);
+
+ bool init(const std::string& dir,
+ u_int16_t jfiles,
+ u_int32_t jfileSizePgs,
+ uint32_t wCachePageSize);
+
void initManagement (qpid::broker::Broker* broker);
void truncate();
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
+
void destroy(qpid::broker::PersistableQueue& queue);
void create(const qpid::broker::PersistableExchange& queue,
const qpid::framing::FieldTable& args);
+
void destroy(const qpid::broker::PersistableExchange& queue);
- void bind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
- void unbind(const qpid::broker::PersistableExchange& exchange,
- const qpid::broker::PersistableQueue& queue,
- const std::string& key, const qpid::framing::FieldTable& args);
+ void bind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+ void unbind(const qpid::broker::PersistableExchange& exchange,
+ const qpid::broker::PersistableQueue& queue,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+
void create(const qpid::broker::PersistableConfig& config);
+
void destroy(const qpid::broker::PersistableConfig& config);
void recover(qpid::broker::RecoveryManager& queues);
void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+
void destroy(qpid::broker::PersistableMessage& msg);
- void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
+
+ void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ const std::string& data);
+
void loadContent(const qpid::broker::PersistableQueue& queue,
- boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
- std::string& data, u_int64_t offset, u_int32_t length);
+ boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
+ std::string& data,
+ u_int64_t offset,
+ u_int32_t length);
- void enqueue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
- void dequeue(qpid::broker::TransactionContext* ctxt,
- boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
- const qpid::broker::PersistableQueue& queue);
+ void enqueue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+
+ void dequeue(qpid::broker::TransactionContext* ctxt,
+ boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
+ const qpid::broker::PersistableQueue& queue);
+
void flush(const qpid::broker::PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& queue);
-
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
@@ -239,10 +314,12 @@
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
- qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
+ inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&)
{ return qpid::management::Manageable::STATUS_OK; }
- };
- }
-}
+ }; // class BdbMessageStore
+
+ } // namespace bdbstore
+} // namespace rhm
+
#endif
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -166,8 +166,7 @@
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin();
- i != prep_tx_list_ptr->end(); i++) {
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
prep_xid_list.push_back(i->xid);
}
@@ -347,9 +346,9 @@
}
void
-JournalImpl::dequeue_data_record(data_tok* const dtokp)
+JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_data_record(dtokp));
+ handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
@@ -359,9 +358,9 @@
}
void
-JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
- handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid));
+ handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -156,9 +156,9 @@
void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- void dequeue_data_record(journal::data_tok* const dtokp);
+ void dequeue_data_record(journal::data_tok* const dtokp, const bool txn_coml_commit = false);
- void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
+ void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
Modified: store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/StorePlugin.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -49,19 +49,12 @@
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
- throw Exception ("If --data-dir is blank or --no-data-dir is specified, "
- "--store-directory must be present.");
+ throw Exception ("If --data-dir is blank or --no-data-dir is specified, --store-dir must be present.");
options.storeDir = dataDir.getPath ();
}
- if (!store->init (&options))
- {
- throw Exception("Existing journal found in different bdb/async mode. "
- "Move or delete existing data files before changing modes, or use "
- "'--store-force yes' to discard existing data.");
- }
-
+ store->init(&options);
broker->setStore (store);
}
Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h 2008-07-14 17:05:31 UTC (rev 2198)
@@ -47,11 +47,11 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
-protected:
-
+ protected:
static qpid::sys::Mutex globalSerialiser;
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+ typedef ipqdef::iterator ipqItr;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
ipqdef impactedQueues; // list of Queues used in the txn
@@ -66,31 +66,34 @@
std::string tid;
DbTxn* txn;
- void completeTXN(bool commit) {
+ virtual void completeTxn(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->addRef();
- dtokp->set_external_rid(true);
- dtokp->set_rid(loggedtx->next());
- try {
- if (commit) {
- jc->txn_commit(dtokp.get(), getXid());
- jc->flush(true);
- } else {
- jc->txn_abort(dtokp.get(), getXid());
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ commitTxn(static_cast<JournalImpl*>(*i), commit);
+ }
+ impactedQueues.clear();
+ }
+
+ void commitTxn(JournalImpl* jc, bool commit) {
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(loggedtx->next());
+ try {
+ if (commit) {
+ jc->txn_commit(dtokp.get(), getXid());
+ jc->flush(true);
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
}
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
}
- deleteXidRecord();
}
-public:
+ public:
TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
if (loggedtx) {
@@ -119,22 +122,25 @@
if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop) jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
}
firstloop = false;
}
}
+ void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+ }
+
void begin(DbEnv& env, bool sync = false) {
env.txn_begin(0, &txn, 0);
if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
@@ -160,9 +166,8 @@
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord() { impactedQueues.clear(); }
void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
- void complete(bool commit) { completeTXN(commit); }
+ void complete(bool commit) { completeTxn(commit); }
DataTokenImpl* getDtok() { return dtokp.get(); }
void incrDtokRef() { dtokp->addRef(); }
void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -175,11 +180,29 @@
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
{
+ protected:
const std::string xid;
-public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
- virtual bool isTPC() { return true; }
- virtual const std::string& getXid() { return xid; }
+ JournalImpl* preparedXidStorePtr;
+ virtual void completeTxn(bool commit) {
+ TxnCtxt::completeTxn(commit);
+ if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
+ }
+
+ public:
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
+ void sync() {
+ TxnCtxt::sync();
+ bool allWritten = false;
+ if (preparedXidStorePtr) {
+ while (!allWritten) {
+ allWritten = true;
+ sync_jrnl(preparedXidStorePtr, true, allWritten);
+ }
+ }
+ }
+ inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+ inline virtual bool isTPC() { return true; }
+ inline virtual const std::string& getXid() { return xid; }
};
}}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_hdr.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -87,6 +87,7 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
u_int32_t _filler0; ///< Little-endian filler for 32-bit size_t
#endif
+ static const u_int16_t DEQ_HDR_TXNCMPLCOMMIT_MASK = 0x10;
/**
* \brief Default constructor, which sets all values to 0.
@@ -105,7 +106,8 @@
* \brief Convenience constructor which initializes values during construction.
*/
inline deq_hdr(const u_int32_t magic, const u_int8_t version, const u_int64_t rid,
- const u_int64_t deq_rid, const std::size_t xidsize, const bool owi):
+ const u_int64_t deq_rid, const std::size_t xidsize, const bool owi,
+ const bool txn_coml_commit = false):
rec_hdr(magic, version, rid, owi), _deq_rid(deq_rid),
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
_filler0(0),
@@ -114,8 +116,17 @@
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
, _filler0(0)
#endif
- {}
+ { set_txn_coml_commit(txn_coml_commit); }
+
+ inline bool is_txn_coml_commit() const { return _uflag & DEQ_HDR_TXNCMPLCOMMIT_MASK; }
+
+ inline void set_txn_coml_commit(const bool commit)
+ {
+ _uflag = commit ? _uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
+ _uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+ }
+
/**
* \brief Returns the size of the header in bytes.
*/
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -52,8 +52,8 @@
{}
deq_rec::deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi):
- _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi),
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit):
+ _deq_hdr(RHM_JDAT_DEQ_MAGIC, RHM_JDAT_VERSION, rid, drid, xidlen, owi, txn_coml_commit),
_xidp(xidp),
_buff(0),
_deq_tail(_deq_hdr)
@@ -68,6 +68,8 @@
deq_rec::reset()
{
_deq_hdr._rid = 0;
+ _deq_hdr.set_owi(false);
+ _deq_hdr.set_txn_coml_commit(false);
_deq_hdr._deq_rid = 0;
_deq_hdr._xidsize = 0;
_deq_tail._rid = 0;
@@ -77,10 +79,11 @@
void
deq_rec::reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi)
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit)
{
_deq_hdr._rid = rid;
_deq_hdr.set_owi(owi);
+ _deq_hdr.set_txn_coml_commit(txn_coml_commit);
_deq_hdr._deq_rid = drid;
_deq_hdr._xidsize = xidlen;
_deq_tail._rid = rid;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/deq_rec.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -67,20 +67,21 @@
deq_rec();
// constructor used for write operations, where xid already exists
deq_rec(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
virtual ~deq_rec();
// Prepare instance for use in reading data from journal
void reset();
// Prepare instance for use in writing data to journal
void reset(const u_int64_t rid, const u_int64_t drid, const void* const xidp,
- const std::size_t xidlen, const bool owi);
+ const std::size_t xidlen, const bool owi, const bool txn_coml_commit);
u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks);
u_int32_t decode(rec_hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks);
// Decode used for recover
bool rcv_decode(rec_hdr h, std::ifstream* ifsp, std::size_t& rec_offs);
+ inline bool is_txn_coml_commit() const { return _deq_hdr.is_txn_coml_commit(); }
inline u_int64_t rid() const { return _deq_hdr._rid; }
inline u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
std::size_t get_xid(void** const xidpp);
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -338,25 +338,25 @@
}
iores
-jcntl::dequeue_data_record(data_tok* const dtokp)
+jcntl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
return r;
}
}
iores
-jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
+jcntl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
check_wstatus("dequeue_data");
{
slock s(&_wr_mutex);
iores r;
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size()), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
return r;
}
}
@@ -711,7 +711,8 @@
dr.get_xid(&xidp);
assert(xidp != 0);
std::string xid((char*)xidp, dr.xid_size());
- _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
+ _tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false,
+ dr.is_txn_coml_commit()));
_tmap.set_aio_compl(xid, dr.rid());
std::free(xidp);
}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -428,10 +428,13 @@
*
* \param dtokp Pointer to data_tok instance for this data, used to track state of data
* through journal.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_data_record(data_tok* const dtokp);
+ iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -446,10 +449,13 @@
* through journal.
* \param xid String containing xid. An empty string (i.e. length=0) will be considered
* non-transactional.
+ * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
+ * prepared XID list items, sets whether the complete() was called in commit or abort
+ * mode.
*
* \exception TODO
*/
- iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid);
+ iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -574,8 +580,7 @@
* \return <b><i>true</i></b> if the jouranl is ready to read and write data;
* <b><i>false</i></b> otherwise.
*/
- inline bool is_ready() const { return _init_flag and not _stop_flag; }
- inline bool is_init() const { return _init_flag; }
+ inline bool is_ready() const { return _init_flag && !_stop_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -602,7 +607,8 @@
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
- void get_open_txn_list(std::vector<std::string>& xv) { _tmap.xid_list(xv); }
+ // TODO Make this a const, but txn_map must support const first.
+ inline txn_map& get_txn_map() { return _tmap; }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -42,11 +42,12 @@
{
txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag):
+ const bool enq_flag, const bool commit_flag):
_rid(rid),
_drid(drid),
_fid(fid),
_enq_flag(enq_flag),
+ _commit_flag(commit_flag),
_aio_compl(false)
{}
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/txn_map.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -64,9 +64,10 @@
u_int64_t _drid; ///< Dequeue record id for this operation
u_int16_t _fid; ///< File id, to be used when transferring to emap on commit
bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _commit_flag; ///< (2PC transactions) Records 2PC complete c/a mode
bool _aio_compl; ///< Initially false, set to true when record AIO returns
txn_data_struct(const u_int64_t rid, const u_int64_t drid, const u_int16_t fid,
- const bool enq_flag);
+ const bool enq_flag, const bool commit_flag = false);
};
typedef txn_data_struct txn_data;
typedef std::vector<txn_data> txn_data_list;
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -256,7 +256,7 @@
}
iores
-wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
+wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
{
if (xid_len)
assert(xid_ptr != 0);
@@ -284,7 +284,7 @@
const bool ext_rid = dtokp->external_rid();
u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
- _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi());
+ _deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
if (!cont)
{
if (!ext_rid)
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.hpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -110,7 +110,8 @@
iores enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external);
- iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
+ iores dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len,
+ const bool txn_coml_commit);
iores abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len);
iores flush();
Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,9 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -45,99 +45,171 @@
const char* tdp = getenv("TMPDIR");
const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+ public:
+ TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ }
+};
+
+// Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+// reamining open transactions
+class TestMessageStore: public BdbMessageStore
+{
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TransactionContext> begin() {
+ checkInit();
+ // pass sequence number for c/a
+ return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+ }
+};
+
// === Helper fns ===
const string nameA("queueA");
const string nameB("queueB");
-const Uuid messageId(true);
+//const Uuid messageId(true);
std::auto_ptr<BdbMessageStore> store;
-QueueRegistry queues;
+std::auto_ptr<QueueRegistry> queues;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
+template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
//create two queues:
+ FieldTable settings;
queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
- FieldTable settings;
queueA->create(settings);
queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
queueB->create(settings);
-
- //create message and enqueue it onto first queue:
- boost::intrusive_ptr<Message> msg = MessageUtils::createMessage("exchange", "routing_key", messageId, 0);
- msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
-
- queueA->deliver(msg);
}
+template <class T>
void restart()
{
queueA.reset();
queueB.reset();
+ queues.reset();
store.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
LinkRegistry links(0);
DtxManager mgr;
mgr.setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, mgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
-void check(bool swapped)
+boost::intrusive_ptr<Message> createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+ boost::intrusive_ptr<Message> msg = MessageUtils::createMessage(exchange, key);
+ msg->getProperties<MessageProperties>()->setCorrelationId(id);
+ msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+}
+
+void swap(bool commit)
+{
+ setup<BdbMessageStore>();
+
+ //create message and enqueue it onto first queue:
+ boost::intrusive_ptr<Message> msgA = createMessage("Message", "exchange", "routing_key");
+ queueA->deliver(msgA);
+
+ boost::intrusive_ptr<Message> msgB = queueA->dequeue().payload;
+ BOOST_REQUIRE(msgB);
+ //move the message from one queue to the other as a transaction
+ std::auto_ptr<TransactionContext> txn = store->begin();
+ queueB->enqueue(txn.get(), msgB);//note: need to enqueue it first to avoid message being deleted
+ queueA->dequeue(txn.get(), msgB);
+ if (commit) {
+ store->commit(*txn);
+ } else {
+ store->abort(*txn);
+ }
+
+ restart<BdbMessageStore>();
+
+ // Check outcome
BOOST_REQUIRE(queueA);
BOOST_REQUIRE(queueB);
- Queue::shared_ptr x;//the other queue
+ Queue::shared_ptr x;//the queue from which the message was swapped
Queue::shared_ptr y;//the queue on which the message is expected to be
- if (swapped) {
+ if (commit) {
x = queueA;
y = queueB;
} else {
x = queueB;
y = queueA;
}
-
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
+
+ checkMsg(x, 0);
+ checkMsg(y, 1, "Message");
+ checkMsg(y, 0);
}
-void swap(bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
{
- setup();
+ setup<TestMessageStore>();
+ std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- //move the message from one queue to the other as a transaction
- std::auto_ptr<TransactionContext> txn = store->begin();
- queueB->enqueue(txn.get(), msg);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn.get(), msg);
- if (commit) {
- store->commit(*txn);
- } else {
+ //create two messages and enqueue them onto both queues:
+ boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgA);
+ queueB->enqueue(txn.get(), msgA);
+ boost::intrusive_ptr<Message> msgB = createMessage("MessageB", "exchange", "routing_key");
+ queueA->enqueue(txn.get(), msgB);
+ queueB->enqueue(txn.get(), msgB);
+
+ static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+ if (commit)
+ store->commit(*txn);
+ else
store->abort(*txn);
+ restart<TestMessageStore>();
+
+ // Check outcome
+ if (commit)
+ {
+ checkMsg(queueA, 2, "MessageA");
+ checkMsg(queueB, 2, "MessageA");
+ checkMsg(queueA, 1, "MessageB");
+ checkMsg(queueB, 1, "MessageB");
}
-
- restart();
- check(commit);
+ checkMsg(queueA, 0);
+ checkMsg(queueB, 0);
}
-
// === Test suite ===
QPID_AUTO_TEST_CASE(Commit)
@@ -153,5 +225,47 @@
swap(false);
cout << "ok" << endl;
}
+/*
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ testMultiQueueTxn(0, true);
+ cout << "ok" << endl;
+}
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ testMultiQueueTxn(0, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ testMultiQueueTxn(1, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ testMultiQueueTxn(1, false);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ testMultiQueueTxn(2, true);
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ testMultiQueueTxn(2, false);
+ cout << "ok" << endl;
+}
+*/
QPID_AUTO_TEST_SUITE_END()
Modified: store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 14:53:45 UTC (rev 2197)
+++ store/branches/mrg-1.0/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-14 17:05:31 UTC (rev 2198)
@@ -30,9 +30,11 @@
#include "BdbMessageStore.h"
#include <iostream>
#include "MessageUtils.h"
-#include <qpid/framing/AMQHeaderBody.h>
-#include <qpid/broker/Queue.h>
-#include <qpid/broker/RecoveryManagerImpl.h>
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "TxnCtxt.h"
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -67,8 +69,8 @@
public:
Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
void init(){ msg = test->deliver(messageId, test->queueA); }
- void run(TPCTransactionContext* txn) { test->swap(txn); }
- void check(bool committed) { test->swapCheck(committed, messageId); }
+ void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+ void check(bool committed) { test->swapCheck(committed, messageId, test->queueA, test->queueB); }
};
class Enqueue : public Strategy
@@ -81,17 +83,17 @@
Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
void init() {}
void run(TPCTransactionContext* txn) {
- msg1 = test->enqueue(txn, "Enqueue1");
- msg2 = test->enqueue(txn, "Enqueue2");
- msg3 = test->enqueue(txn, "Enqueue3");
+ msg1 = test->enqueue(txn, "Enqueue1", test->queueA);
+ msg2 = test->enqueue(txn, "Enqueue2", test->queueA);
+ msg3 = test->enqueue(txn, "Enqueue3", test->queueA);
}
void check(bool committed) {
if (committed) {
- test->checkA(3, "Enqueue1");
- test->checkA(2, "Enqueue2");
- test->checkA(1, "Enqueue3");
+ test->checkMsg(test->queueA, 3, "Enqueue1");
+ test->checkMsg(test->queueA, 2, "Enqueue2");
+ test->checkMsg(test->queueA, 1, "Enqueue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
@@ -109,26 +111,94 @@
msg3 = test->deliver("Dequeue3", test->queueA);
}
void run(TPCTransactionContext* txn) {
- test->dequeue(txn);
- test->dequeue(txn);
- test->dequeue(txn);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
+ test->dequeue(txn, test->queueA);
}
void check(bool committed) {
if (!committed) {
- test->checkA(3, "Dequeue1");
- test->checkA(2, "Dequeue2");
- test->checkA(1, "Dequeue3");
+ test->checkMsg(test->queueA, 3, "Dequeue1");
+ test->checkMsg(test->queueA, 2, "Dequeue2");
+ test->checkMsg(test->queueA, 1, "Dequeue3");
}
- test->checkA(0);
+ test->checkMsg(test->queueA, 0);
}
};
+ class MultiQueueTxn : public Strategy
+ {
+ TwoPhaseCommitTest* const test;
+ boost::intrusive_ptr<Message> msg1;
+ boost::intrusive_ptr<Message> msg2;
+ std::set<Queue::shared_ptr> queueset;
+ public:
+ MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+ virtual void init() {}
+ virtual void run(TPCTransactionContext* txn) {
+ queueset.insert(test->queueA);
+ queueset.insert(test->queueB);
+ msg1 = test->enqueue(txn, "Message1", queueset);
+ msg2 = test->enqueue(txn, "Message2", queueset);
+ queueset.clear();
+ }
+ virtual void check(bool committed) {
+ TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+ if (committed)
+ {
+ test->checkMsg(test->queueA, 2, "Message1");
+ test->checkMsg(test->queueB, 2, "Message1");
+ test->checkMsg(test->queueA, 1, "Message2");
+ test->checkMsg(test->queueB, 1, "Message2");
+ }
+ test->checkMsg(test->queueA, 0);
+ test->checkMsg(test->queueB, 0);
+ // Check there are no remaining open txns in store
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+ BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
+ }
+ };
+
+ // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+ class TestTPCTxnCtxt : public TPCTxnCtxt
+ {
+ public:
+ TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+ void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+ // Remove queue members from back of impactedQueues until queues_rem reamin.
+ // to end to simulate multi-queue txn complete failure.
+ while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+ // If prepared list is not to be committed, set pointer to 0
+ if (!complete_prepared_list) preparedXidStorePtr = 0;
+ }
+ };
+
+ // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+ // reamining open transactions
+ class TestMessageStore: public BdbMessageStore
+ {
+ public:
+ TestMessageStore(const char* envpath = 0) : BdbMessageStore(envpath) {}
+ std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
+ checkInit();
+ IdSequence* jtx = &messageIdSequence;
+ // pass sequence number for c/a
+ return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+ }
+ u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+ return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+ }
+ u_int32_t getRemainingPreparedListTxns() {
+ return preparedXidStorePtr->get_open_txn_cnt();
+ }
+ };
+
const string nameA;
const string nameB;
std::auto_ptr<BdbMessageStore> store;
std::auto_ptr<DtxManager> dtxmgr;
- QueueRegistry queues;
- LinkRegistry links;
+ std::auto_ptr<QueueRegistry> queues;
+ std::auto_ptr<LinkRegistry> links;
Queue::shared_ptr queueA;
Queue::shared_ptr queueB;
boost::intrusive_ptr<Message> msg1;
@@ -137,14 +207,14 @@
void recoverPrepared(bool commit)
{
- setup();
+ setup<BdbMessageStore>();
Swap swap(this, "RecoverPrepared");
swap.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
swap.run(txn.get());
store->prepare(*txn);
- restart();
+ restart<BdbMessageStore>();
//check that the message is not available from either queue
BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
@@ -158,58 +228,89 @@
}
swap.check(commit);
- restart();
+ restart<BdbMessageStore>();
swap.check(commit);
}
+
+ void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+ {
+ setup<TestMessageStore>();
+ MultiQueueTxn mqtTest(this);
+ mqtTest.init();
+ std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+ mqtTest.run(txn.get());
+ store->prepare(*txn);
+ // As the commits and aborts should happen through DtxManager, and it is too complex to
+ // pass all these test params through, we bypass DtxManager and use the store directly.
+ // This will prevent the queues from seeing committed txns, however. To test the success
+ // or failure of
+ static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+ if (commit)
+ store->commit(*txn);
+ else
+ store->abort(*txn);
+ restart<TestMessageStore>();
+ mqtTest.check(commit);
+ }
+
void commit(Strategy& strategy)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
store->prepare(*txn);
- store->commit(*txn);
- restart();
+ store->commit(*txn);
+ restart<BdbMessageStore>();
strategy.check(true);
}
void abort(Strategy& strategy, bool prepare)
{
- setup();
+ setup<BdbMessageStore>();
strategy.init();
std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
strategy.run(txn.get());
if (prepare) store->prepare(*txn);
store->abort(*txn);
- restart();
+ restart<BdbMessageStore>();
strategy.check(false);
}
- void swap(TPCTransactionContext* txn)
+ void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
{
- msg1 = queueA->dequeue().payload;//just dequeues in memory
+ msg1 = from->dequeue().payload;//just dequeues in memory
//move the message from one queue to the other as part of a
//distributed transaction
- queueB->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
- queueA->dequeue(txn, msg1);
+ to->enqueue(txn, msg1);//note: need to enqueue it first to avoid message being deleted
+ from->dequeue(txn, msg1);
}
- void dequeue(TPCTransactionContext* txn)
+ void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
{
- msg2 = queueA->dequeue().payload;//just dequeues in memory
- queueA->dequeue(txn, msg2);
+ msg2 = queue->dequeue().payload;//just dequeues in memory
+ queue->dequeue(txn, msg2);
}
- boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid)
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, Queue::shared_ptr& queue)
{
- boost::intrusive_ptr<Message> msg = createMessage(msgid);
- queueA->enqueue(txn, msg);
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ queue->enqueue(txn, msg);
return msg;
}
+ boost::intrusive_ptr<Message> enqueue(TPCTransactionContext* txn, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+ {
+ boost::intrusive_ptr<Message> msg = createMessage(msgid);
+ for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+ (*i)->enqueue(txn, msg);
+ }
+ return msg;
+ }
+
boost::intrusive_ptr<Message> deliver(const string& msgid, Queue::shared_ptr& queue)
{
msg4 = createMessage(msgid);
@@ -217,9 +318,10 @@
return msg4;
}
+ template <class T>
void setup()
{
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
store->truncate();
@@ -239,60 +341,63 @@
return msg;
}
+ template <class T>
void restart()
{
queueA.reset();
queueB.reset();
store.reset();
+ queues.reset();
+ links.reset();
- store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
+ store = std::auto_ptr<T>(new T());
store->init(test_dir, 4, 1, 8);
ExchangeRegistry exchanges;
+ queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+ links = std::auto_ptr<LinkRegistry>(new LinkRegistry(0));
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager);
dtxmgr->setStore (store.get());
- RecoveryManagerImpl recovery(queues, exchanges, links, *dtxmgr, 0);
+ RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, 0);
store->recover(recovery);
- queueA = queues.find(nameA);
- queueB = queues.find(nameB);
+ queueA = queues->find(nameA);
+ queueB = queues->find(nameB);
}
- void swapCheck(bool swapped, const string& msgid)
+ void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
{
- BOOST_REQUIRE(queueA);
- BOOST_REQUIRE(queueB);
+ BOOST_REQUIRE(queue);
+ BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+ if (size > 0) {
+ boost::intrusive_ptr<Message> msg = queue->dequeue().payload;
+ BOOST_REQUIRE(msg);
+ BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ }
+ }
+
+ void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+ {
+ BOOST_REQUIRE(from);
+ BOOST_REQUIRE(to);
- Queue::shared_ptr x;//the other queue
- Queue::shared_ptr y;//the queue on which the message is expected to be
+ Queue::shared_ptr x; //the queue from which the message was swapped
+ Queue::shared_ptr y; //the queue on which the message is expected to be
if (swapped) {
- x = queueA;
- y = queueB;
+ x = from;
+ y = to;
} else {
- x = queueB;
- y = queueA;
+ x = to;
+ y = from;
}
- BOOST_CHECK_EQUAL((u_int32_t) 0, x->getMessageCount());
- BOOST_CHECK_EQUAL((u_int32_t) 1, y->getMessageCount());
- boost::intrusive_ptr<Message> msg = y->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
+ checkMsg(x, 0);
+ checkMsg(y, 1, msgid);
+ checkMsg(y, 0);
}
- void checkA(u_int32_t size, const string& msgid = "<none>")
- {
- BOOST_REQUIRE(queueA);
- BOOST_CHECK_EQUAL(size, queueA->getMessageCount());
- if (size > 0) {
- boost::intrusive_ptr<Message> msg = queueA->dequeue().payload;
- BOOST_REQUIRE(msg);
- BOOST_CHECK_EQUAL(msgid, msg->getProperties<MessageProperties>()->getCorrelationId());
- }
- }
-
public:
- TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0) {}
+ TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
void testCommitEnqueue()
{
@@ -357,6 +462,46 @@
{
recoverPrepared(false);
}
+
+ void testMultiQueueCommit()
+ {
+ testMultiQueueTxn(2, true, true);
+ }
+
+ void testMultiQueueAbort()
+ {
+ testMultiQueueTxn(2, true, false);
+ }
+
+ void testMultiQueueNoQueueCommitRecover()
+ {
+ testMultiQueueTxn(0, false, true);
+ }
+
+ void testMultiQueueNoQueueAbortRecover()
+ {
+ testMultiQueueTxn(0, false, false);
+ }
+
+ void testMultiQueueSomeQueueCommitRecover()
+ {
+ testMultiQueueTxn(1, false, true);
+ }
+
+ void testMultiQueueSomeQueueAbortRecover()
+ {
+ testMultiQueueTxn(1, false, false);
+ }
+
+ void testMultiQueueAllQueueCommitRecover()
+ {
+ testMultiQueueTxn(2, false, true);
+ }
+
+ void testMultiQueueAllQueueAbortRecover()
+ {
+ testMultiQueueTxn(2, false, false);
+ }
};
TwoPhaseCommitTest tpct;
@@ -440,4 +585,60 @@
cout << "ok" << endl;
}
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+ cout << test_filename << ".MultiQueueCommit: " << flush;
+ tpct.testMultiQueueCommit();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+ cout << test_filename << ".MultiQueueAbort: " << flush;
+ tpct.testMultiQueueAbort();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+ tpct.testMultiQueueNoQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+ tpct.testMultiQueueNoQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+ tpct.testMultiQueueSomeQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+ tpct.testMultiQueueSomeQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+ tpct.testMultiQueueAllQueueCommitRecover();
+ cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+ cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+ tpct.testMultiQueueAllQueueAbortRecover();
+ cout << "ok" << endl;
+}
+
QPID_AUTO_TEST_SUITE_END()
More information about the rhmessaging-commits
mailing list