Author: kpvdr
Date: 2008-07-02 16:54:27 -0400 (Wed, 02 Jul 2008)
New Revision: 2173
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
store/trunk/cpp/tests/jrnl/_st_basic.cpp
store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
Log:
Moved prepared XID list from BDB to a journal instance in BdbMessageStore
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -22,20 +22,11 @@
*/
#include "BdbMessageStore.h"
-#include <qpid/broker/RecoveryManager.h>
-#include <qpid/broker/Message.h>
-#include <qpid/framing/Buffer.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Mutex.h>
-#include <algorithm>
-#include <iomanip>
-#include <sstream>
+
#include "BindingDbt.h"
+#include "BufferValue.h"
#include "IdPairDbt.h"
-#include "StringDbt.h"
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
-#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/PackageMrgstore.h"
using namespace rhm::bdbstore;
@@ -63,9 +54,6 @@
mappingDb(&env, 0),
bindingDb(&env, 0),
generalDb(&env, 0),
- enqueueXidDb(&env, 0),
- dequeueXidDb(&env, 0),
- prepareXidDb(&env, 0),
numJrnlFiles(defNumJrnlFiles),
jrnlFsizePgs(defJrnlFileSizePgs),
wcache_pgsize_sblks(JRNL_WMGR_DEF_PAGE_SIZE),
@@ -125,12 +113,11 @@
if (dir.size()>0) storeDir = dir;
- string bdbdir = storeDir + "/rhm/dat/";
- journal::jdir::create_dir(bdbdir);
+ journal::jdir::create_dir(getBdbBaseDir());
+ journal::jdir::create_dir(getPxidBaseDir());
-
try {
- env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK |
DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
+ env.open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN |
DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of
bd4 does not match that which created the store database. "
@@ -150,10 +137,11 @@
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- open(enqueueXidDb, txn.get(), "enqueue_xid.db", true);
- open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
- open(prepareXidDb, txn.get(), "prepare_xid.db", false);
+ preparedXidStorePtr.reset(new JournalImpl("PreparedXidStore",
getPxidBaseDir(), "prepared_xid", defJournalGetEventsTimeout,
defJournalFlushTimeout));
txn.commit();
+ } catch (const journal::jexception& e) {
+ txn.abort();
+ THROW_STORE_EXCEPTION_2("Error opening preparedXidStore instance",
e.what());
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
@@ -167,6 +155,15 @@
return true;
}
+void BdbMessageStore::chkInitPreparedXidStore()
+{
+ if (!preparedXidStorePtr->is_init())
+ {
+ u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in
sblks. Currently 2014 sblks (1 MiB).
+ preparedXidStorePtr->initialize(defXidStoreNumJrnlFiles,
defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE, defTotWCacheSize / wcache_pgsize_sblks,
defXidStoreWCachePageSize);
+ }
+}
+
bool BdbMessageStore::init(const qpid::Options* options)
{
const Options* opts = static_cast<const Options*>(options);
@@ -253,11 +250,16 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
+ if (preparedXidStorePtr->is_ready()) preparedXidStorePtr->stop(true);
} catch (const DbException& e) {
- QPID_LOG(error, "Error closing databases: " << e.what());
+ QPID_LOG(error, "Error closing BDB databases: " << e.what());
+ } catch (const journal::jexception& e) {
+ QPID_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- } catch (...) {}
+ QPID_LOG(error, "Error: " << e.what());
+ } catch (...) {
+ QPID_LOG(error, "Unknown error in
BdbMessageStore::~BdbMessageStore()");
+ }
if (mgmtObject.get() != 0)
mgmtObject->resourceDestroy();
@@ -276,6 +278,7 @@
txn->commit(0);
try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ journal::jdir::delete_dir(getPxidBaseDir(),true);
}
catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
@@ -508,7 +511,7 @@
try
{
u_int64_t thisHighestRid = 0;
- jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks, prepared, thisHighestRid, key.id); // start
recovery
+ jQueue->recover(numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE,
wcache_num_pages, wcache_pgsize_sblks, &prepared, thisHighestRid, key.id); // start
recovery
if (thisHighestRid > highestRid)
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages);
@@ -601,6 +604,10 @@
generalIdSequence.reset(maxGeneralId + 1);
}
+
+#define MAX_AIO_SLEEPS 1000 // ~1 second
+#define AIO_SLEEP_TIME 1000 // 1 milisecond
+
void BdbMessageStore::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr&
queue, txn_list& locked, message_index& prepared)
{
@@ -678,7 +685,7 @@
}
case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO");
+ THROW_STORE_EXCEPTION("Timeout waiting for AIO in
BdbMessageStore::recoverMessages()");
::usleep(AIO_SLEEP_TIME);
break;
case rhm::journal::RHM_IORES_EMPTY:
@@ -748,11 +755,11 @@
void BdbMessageStore::recoverXids(txn_list& txns)
{
- std::set<string> prepared;
- collectPreparedXids(prepared);
+ std::set<string> preparedXidSet;
+ collectPreparedXids(preparedXidSet);
- //when using the async journal, it will abort unprepaired xids and populate the
locked maps
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++)
{
+ // Abort unprepaired xids and populate the locked maps
+ for (std::set<string>::iterator i = preparedXidSet.begin(); i !=
preparedXidSet.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
@@ -789,7 +796,20 @@
void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
{
- readXids(prepareXidDb, xids);
+ if (journal::jdir::exists(storeDir + "/rhm/pxid/prepared_xid.jinf"))
+ {
+ u_int64_t highest_rid;
+ preparedXidStorePtr->recover(defXidStoreNumJrnlFiles,
defXidStoreJrnlFileSizePgs * JRNL_RMGR_PAGE_SIZE,
+ JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES / wcache_pgsize_sblks,
defXidStoreWCachePageSize,
+ 0, highest_rid, 0);
+
+ std::vector<std::string> xv;
+ preparedXidStorePtr->get_open_txn_list(xv);
+ for (std::vector<std::string>::const_iterator itr = xv.begin(); itr <
xv.end(); itr++)
+ xids.insert(*itr);
+
+ preparedXidStorePtr->recover_complete(); // start journal.
+ }
}
void BdbMessageStore::stage(const intrusive_ptr<PersistableMessage>& msg)
@@ -962,26 +982,18 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
- bool newId = false;
- if (messageId == 0) {
- messageId = messageIdSequence.next();
- msg->setPersistenceId(messageId);
- newId = true;
- }
- store(&queue, txn, key, msg, newId);
+ bool newId = false;
+ if (messageId == 0) {
+ messageId = messageIdSequence.next();
+ msg->setPersistenceId(messageId);
+ newId = true;
+ }
+ store(&queue, txn, key, msg, newId);
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
-
- if (!ctxt) txn->commit();
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
void BdbMessageStore::store(const PersistableQueue* queue,
@@ -1056,25 +1068,13 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
}
- try {
-
- // add queue* to the txn map..
- if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- async_dequeue(ctxt, msg, queue);
+ // add queue* to the txn map..
+ if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
+ async_dequeue(ctxt, msg, queue);
- msg->dequeueComplete();
- // if ( msg->isDequeueComplete() ) // clear id after last dequeue
- // msg->setPersistenceId(0);
-
- if (!ctxt) txn->commit();
-
- } catch (const std::exception& e) {
- if (!ctxt) txn->abort();
- throw;
- }
+ msg->dequeueComplete();
}
void BdbMessageStore::async_dequeue(
@@ -1145,17 +1145,15 @@
void BdbMessageStore::completed(TPCTxnCtxt& txn, bool commit)
{
- if (!txn.get()) txn.begin(env);
-
try {
+ // Nothing to do if not prepared
+ chkInitPreparedXidStore();
+ if (txn.getDtok().is_enqueued())
+ preparedXidStorePtr->dequeue_txn_data_record(&txn.getDtok(),
txn.getXid());
- StringDbt key(txn.getXid());
- prepareXidDb.del(txn.get(), &key, 0);
-
txn.complete(commit);
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() <<
": " << e.what());
- txn.abort();
throw;
}
}
@@ -1163,9 +1161,8 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
checkInit();
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
@@ -1175,9 +1172,8 @@
IdSequence* jtx = NULL;
jtx = &messageIdSequence;
- // pass sequence number for c/a when using jrnl
+ // pass sequence number for c/a
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env);
return auto_ptr<TPCTransactionContext>(txn);
}
@@ -1188,18 +1184,14 @@
if(!txn) throw InvalidTransactionContextException();
try {
- u_int8_t dummy(1);
- string xid(txn->getXid());
- Dbt key ((void*) xid.data(), xid.length());
- Dbt value(&dummy, sizeof(dummy));
+ chkInitPreparedXidStore();
+ preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, &txn->getDtok(),
txn->getXid(), false);
+ txn->addXidRecord(preparedXidStorePtr.get());
// make sure all the data is written to disk before returning
txn->sync();
- prepareXidDb.put(txn->get(), &key, &value, 0);
-
- txn->commit();
} catch (const std::exception& e) {
- txn->abort();
+ QPID_LOG(error, "Error preparing xid " << txn->getXid()
<< ": " << e.what());
throw;
}
}
@@ -1211,7 +1203,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
} else {
- txn->commit();
+ txn->complete(true);
}
}
@@ -1222,7 +1214,7 @@
if (txn->isTPC()) {
completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
} else {
- txn->abort();
+ txn->complete(false);
}
}
@@ -1328,10 +1320,24 @@
string BdbMessageStore::getJrnlBaseDir()
{
std::stringstream dir;
- dir << storeDir<< "/rhm/jrnl/" ;
+ dir << storeDir << "/rhm/jrnl/" ;
return dir.str();
}
+string BdbMessageStore::getBdbBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/dat/" ;
+ return dir.str();
+}
+
+string BdbMessageStore::getPxidBaseDir()
+{
+ std::stringstream dir;
+ dir << storeDir << "/rhm/pxid/" ;
+ return dir.str();
+}
+
string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for
exmaple /var/rhm/ + queueDir/
{
return getJrnlDir(queue.getName().c_str());
@@ -1372,5 +1378,3 @@
"Lower values decrease latency at the expense of throughput.")
;
}
-
-
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -24,26 +24,30 @@
#ifndef _BdbMessageStore_
#define _BdbMessageStore_
+#include <string>
#include "db-inc.h"
-#include "BufferValue.h"
+//#include "BufferValue.h"
#include "Cursor.h"
#include "IdDbt.h"
-#include "IdSequence.h"
+//#include "IdSequence.h"
#include "PreparedTransaction.h"
-#include "StoreException.h"
+//#include "StoreException.h"
#include "TxnCtxt.h"
-#include <qpid/broker/Broker.h>
-#include <qpid/broker/MessageStore.h>
-#include <qpid/management/Manageable.h>
-#include <qpid/sys/Monitor.h>
-#include <qpid/sys/Time.h>
-#include <map>
-#include <set>
-#include <iostream>
-#include <boost/format.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/ptr_container/ptr_list.hpp>
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/management/Manageable.h"
+//#include <qpid/sys/Monitor.h>
+//#include <qpid/sys/Time.h>
+//#include <map>
+//#include <set>
+//#include <iostream>
+//#include <boost/format.hpp>
+//#include <boost/intrusive_ptr.hpp>
+//#include <boost/ptr_container/ptr_list.hpp>
#include "qpid/management/Store.h"
+#include "jrnl/jcfg.hpp"
+#include "JournalImpl.h"
+#include "IdSequence.h"
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
@@ -68,9 +72,12 @@
typedef boost::ptr_list<PreparedTransaction> txn_list;
// Default store settings
- static const u_int16_t defNumJrnlFiles = 8; // TODO: make configurable
- static const u_int32_t defJrnlFileSizePgs = 24; // TODO: make configurable
- static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE *
JRNL_SBLK_SIZE / 1024; // TODO: make configurable
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+ static const u_int32_t defWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE *
JRNL_SBLK_SIZE / 1024;
+ static const u_int16_t defXidStoreNumJrnlFiles = 8;
+ static const u_int32_t defXidStoreJrnlFileSizePgs = 24;
+ static const u_int32_t defXidStoreWCachePageSize = JRNL_WMGR_DEF_PAGE_SIZE *
JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
std::list<Db*> dbs;
DbEnv env;
@@ -81,9 +88,7 @@
Db mappingDb;
Db bindingDb;
Db generalDb;
- Db enqueueXidDb;
- Db dequeueXidDb;
- Db prepareXidDb;
+ boost::shared_ptr<JournalImpl> preparedXidStorePtr;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
@@ -150,9 +155,12 @@
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple
/var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
string getJrnlBaseDir();
+ string getBdbBaseDir();
+ string getPxidBaseDir();
inline void checkInit() {
if (!isInit) init("/var", defNumJrnlFiles, defJrnlFileSizePgs,
defWCachePageSize); isInit = true;
}
+ void chkInitPreparedXidStore();
public:
struct Options : public qpid::Options {
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -152,7 +152,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
{
@@ -162,34 +162,42 @@
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
- // Create list of prepared xids
- std::vector<std::string> prep_xid_list;
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- prep_xid_list.push_back(i->xid);
+
+ if (prep_tx_list_ptr) {
+ // Create list of prepared xids
+ std::vector<std::string> prep_xid_list;
+ for (bdbstore::PreparedTransaction::list::iterator i =
prep_tx_list_ptr->begin();
+ i != prep_tx_list_ptr->end(); i++) {
+ prep_xid_list.push_back(i->xid);
+ }
+
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
rd_cb, wr_cb,
+ &prep_xid_list, highest_rid);
+ } else {
+ jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
rd_cb, wr_cb,
+ 0, highest_rid);
}
-
- jcntl::recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
rd_cb, wr_cb,
- prep_xid_list, highest_rid);
// Populate PreparedTransaction lists from _tmap
- for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++) {
- try {
- txn_data_list tdl = _tmap.get_tdata_list(i->xid);
- assert(tdl.size()); // should never be empty
- for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
- if (tdl_itr->_enq_flag) { // enqueue op
- i->enqueues->add(queue_id, tdl_itr->_rid);
- } else { // dequeue op
- i->dequeues->add(queue_id, tdl_itr->_drid);
+ if (prep_tx_list_ptr)
+ {
+ for (bdbstore::PreparedTransaction::list::iterator i =
prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
+ try {
+ txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_drid);
+ }
}
}
+ catch (const jexception& e) {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+ throw;
+ }
}
- catch (const jexception& e) {
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw;
- }
}
std::ostringstream oss2;
oss2 << "Recover phase I complete; highest rid found = 0x" <<
std::hex << highest_rid;
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -119,7 +119,7 @@
const u_int32_t wcache_pgsize_sblks,
const journal::rd_aio_cb rd_cb,
const journal::wr_aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id);
@@ -127,11 +127,11 @@
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
- boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
+ boost::ptr_list<bdbstore::PreparedTransaction>*
prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id) {
recover(num_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
0,
- &aio_wr_callback, prep_tx_list, highest_rid, queue_id);
+ &aio_wr_callback, prep_tx_list_ptr, highest_rid, queue_id);
}
void recover_complete();
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/TxnCtxt.h 2008-07-02 20:54:27 UTC (rev 2173)
@@ -2,7 +2,7 @@
Copyright (C) 2007 Red Hat Software
This file is part of Red Hat Messaging.
-
+
Red Hat Messaging is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
@@ -24,38 +24,40 @@
#ifndef _TxnCtxt_
#define _TxnCtxt_
-#include "db-inc.h"
-#include <qpid/broker/MessageStore.h>
-#include <qpid/sys/Mutex.h>
-#include <boost/shared_ptr.hpp>
-#include <sstream>
-#include <memory>
-#include <vector>
-#include "JournalImpl.h"
-#include "DataTokenImpl.h"
#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <jrnl/jexception.hpp>
+#include <db-inc.h>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <string>
+#include <unistd.h> // ::usleep()
+#include "DataTokenImpl.h"
+#include "IdSequence.h"
+#include "JournalImpl.h"
+#include "jrnl/jexception.hpp"
+#include "qpid/broker/PersistableQueue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/sys/Mutex.h"
+#include "StoreException.h"
+
namespace rhm{
namespace bdbstore{
-// find a better place to put these
-#define MAX_AIO_SLEEPS 1000
-#define AIO_SLEEP_TIME 1000
-
-
class TxnCtxt : public qpid::broker::TransactionContext
{
protected:
+
+ static qpid::sys::Mutex globalSerialiser;
+
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
- static qpid::sys::Mutex globalSerialiser;
-
ipqdef impactedQueues; // list of Queues used in the txn
mutable qpid::sys::Mutex Lock;
IdSequence* loggedtx;
+ DataTokenImpl dtok;
AutoScopedLock globalHolder;
/**
@@ -63,67 +65,70 @@
*/
std::string tid;
DbTxn* txn;
-
- void completeTXN(bool commit){
+
+ void completeTXN(bool commit) {
sync();
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i !=
impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i !=
impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
if (jc && loggedtx) { /* if using journal */
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->set_external_rid(true);
dtokp->set_rid(loggedtx->next());
- try{
+ try {
if (commit) {
jc->txn_commit(dtokp.get(), getXid());
jc->flush(true);
} else {
jc->txn_abort(dtokp.get(), getXid());
}
- } catch (const journal::jexception& e) {
+ } catch (const journal::jexception& e) {
//std::cout << "Error commit" << e <<
std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") +
e.what());
}
-
}
- }
+ }
deleteXidRecord();
}
-
+
public:
-
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
- if (loggedtx){
- std::stringstream s;
- s << "rhm-tid" << this;
- tid.assign(s.str());
- }
+
+ TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
+ if (loggedtx) {
+ std::stringstream s;
+ s << "rhm-tid" << this;
+ tid.assign(s.str());
+ }
}
-
+
/**
* Call to make sure all the data for this txn is written to safe store
*
*@return if the data sucessfully synced.
- */
- void sync(){
+ */
+
+ virtual ~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 1000 // ~1 second
+#define SYNC_SLEEP_TIME 1000 // 1 milisecond
+
+ void sync() {
bool allWritten = false;
bool firstloop = true;
- while (loggedtx && !allWritten){
- if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events
call aiolib..
+ long sleep_cnt = 0L;
+ while (loggedtx && !allWritten) {
+ if (sleep_cnt > MAX_SYNC_SLEEPS)
THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for
TxnCtxt::sync()"));
+ if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into
the get events call aiolib..
allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i !=
impactedQueues.end(); i++) {
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i !=
impactedQueues.end(); i++) {
JournalImpl* jc = static_cast<JournalImpl*>(*i);
-
- try
- {
- if (jc && !(jc->is_txn_synced(getXid())))
- {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop) jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
//std::cout << " TxnCtxt: Error sync 3" << e
<< std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") +
e.what());
}
@@ -131,34 +136,36 @@
firstloop = false;
}
}
-
- virtual ~TxnCtxt() { if(txn) abort(); }
- void begin(DbEnv& env, bool sync = false){
- env.txn_begin(0, &txn, 0);
- if (sync) globalHolder = AutoScopedLock(new
qpid::sys::Mutex::ScopedLock(globalSerialiser));
+
+ void begin(DbEnv& env, bool sync = false) {
+ env.txn_begin(0, &txn, 0);
+ if (sync) globalHolder = AutoScopedLock(new
qpid::sys::Mutex::ScopedLock(globalSerialiser));
}
- void commit(){
- txn->commit(0);
- txn = 0;
- completeTXN(true);
- globalHolder.reset();
+
+ void commit() {
+ if (txn) {
+ txn->commit(0);
+ txn = 0;
+ globalHolder.reset();
+ }
}
- void abort(){
+
+ void abort(){
if (txn) {
- txn->abort();
- txn = 0;
- completeTXN(false);
- globalHolder.reset();
- }
+ txn->abort();
+ txn = 0;
+ globalHolder.reset();
+ }
}
- DbTxn* get(){ return txn; }
+
+ DbTxn* get() { return txn; }
virtual bool isTPC() { return false; }
virtual const std::string& getXid() { return tid; }
- void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
- impactedQueues.insert(queue); }
-
+ void deleteXidRecord() { impactedQueues.clear(); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
impactedQueues.insert(queue); }
+ void complete(bool commit) { completeTXN(commit); }
+ DataTokenImpl& getDtok() { return dtok; }
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -168,12 +175,6 @@
TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx),
xid(_xid) {}
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
- // commit the BDB abort, abort commit the jnrl
- void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
- void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
- void complete(bool commit){
- txn->commit(0); completeTXN(commit); txn = 0;
- }
};
}}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -155,7 +155,7 @@
jcntl::recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t&
highest_rid)
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t&
highest_rid)
{
_init_flag = false;
_stop_flag = false;
@@ -187,7 +187,7 @@
_jdir.verify_dir();
_rcvdat.reset(_num_jfiles);
- rcvr_janalyze(_rcvdat, prep_txn_list);
+ rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._full)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover");
@@ -574,7 +574,7 @@
}
void
-jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>&
prep_txn_list)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>*
prep_txn_list_ptr)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." +
JRNL_INFO_EXTENSION, true);
@@ -633,16 +633,16 @@
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
- // Remove all transactions not in prep_txn_list
- std::vector<std::string> xid_list;
- _tmap.xid_list(xid_list);
- for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end();
- itr++)
+ if (!rd._empty && prep_txn_list_ptr)
{
- std::vector<std::string>::const_iterator pitr =
std::find(prep_txn_list.begin(),
- prep_txn_list.end(), *itr);
- if (pitr == prep_txn_list.end())
- _tmap.get_remove_tdata_list(*itr);
+ std::vector<std::string> xid_list;
+ _tmap.xid_list(xid_list);
+ for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end(); itr++)
+ {
+ std::vector<std::string>::const_iterator pitr =
std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+ if (pitr == prep_txn_list_ptr->end())
+ _tmap.get_remove_tdata_list(*itr);
+ }
}
}
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -210,7 +210,7 @@
* \param wcache_pgsize_sblks The size in sblks of each write cache page.
* \param rd_cb Function pointer to callback function for read operations. May be
0 (NULL).
* \param wr_cb Function pointer to callback function for write operations. May be
0 (NULL).
- * \param prep_txn_list
+ * \param prep_txn_list_ptr
* \param highest_rid Returns the highest rid found in the journal during recover
*
* \exception TODO
@@ -218,7 +218,7 @@
void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
const rd_aio_cb rd_cb, const wr_aio_cb wr_cb,
- const std::vector<std::string>& prep_txn_list, u_int64_t&
highest_rid);
+ const std::vector<std::string>* prep_txn_list_ptr, u_int64_t&
highest_rid);
/**
* \brief Notification to the journal that recovery is complete and that normal
operation
@@ -575,6 +575,7 @@
* <b><i>false</i></b> otherwise.
*/
inline bool is_ready() const { return _init_flag and not _stop_flag; }
+ inline bool is_init() const { return _init_flag; }
inline bool is_read_only() const { return _readonly_flag; }
@@ -599,6 +600,9 @@
inline u_int16_t num_jfiles() const { return _num_jfiles; }
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
+ inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+ void get_open_txn_list(std::vector<std::string>& xv) {
_tmap.xid_list(xv); }
// Logging
virtual void log(log_level level, const std::string& log_stmt) const;
@@ -644,7 +648,7 @@
/**
* \brief Analyze journal for recovery.
*/
- void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>&
prep_txn_list);
+ void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>*
prep_txn_list_ptr);
bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool&
lowi, rcvdat& rd);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -967,8 +967,8 @@
else
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << "
index=" << _pg_index << " state=";
- oss << _page_cb_arr[_pg_index].state_str();
+ oss << "jrnl=" << _jc->id() << "
op=" << _op_str[op];
+ oss << " index=" << _pg_index << "
pg_state=" << _page_cb_arr[_pg_index].state_str();
throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr",
"pre_write_check");
}
}
@@ -988,8 +988,8 @@
if (!dtokp->is_writable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << "
dtok_id=" << dtokp->id();
- oss << " dtok_state=" <<
dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << "
op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() <<
" dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(),
"wmgr",
"pre_write_check");
}
@@ -999,8 +999,8 @@
if (!dtokp->is_dequeueable())
{
std::ostringstream oss;
- oss << "op=" << _op_str[op] << "
dtok_id=" << dtokp->id();
- oss << " dtok_state=" << dtokp->wstate_str();
+ oss << "jrnl=" << _jc->id() << "
op=" << _op_str[op];
+ oss << " dtok_id=" << dtokp->id() <<
" dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(),
"wmgr",
"pre_write_check");
}
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -160,6 +160,9 @@
swap.check(commit);
restart();
swap.check(commit);
+
+ // this test leaves xids in the store
+ store->truncate();
}
void commit(Strategy& strategy)
@@ -294,52 +297,46 @@
public:
TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB"), links(0)
{}
- void testCommitSwap()
+ void testCommitEnqueue()
{
- Swap swap(this, "SwapMessageId");
- commit(swap);
+ Enqueue enqueue(this);
+ commit(enqueue);
}
- void testPrepareAndAbortSwap()
+ void testCommitDequeue()
{
- Swap swap(this, "SwapMessageId");
- abort(swap, true);
+ Dequeue dequeue(this);
+ commit(dequeue);
}
- void testAbortNoPrepareSwap()
+ void testCommitSwap()
{
Swap swap(this, "SwapMessageId");
- abort(swap, false);
+ commit(swap);
}
- void testCommitEnqueue()
- {
- Enqueue enqueue(this);
- commit(enqueue);
- }
-
void testPrepareAndAbortEnqueue()
{
Enqueue enqueue(this);
abort(enqueue, true);
}
- void testAbortNoPrepareEnqueue()
+ void testPrepareAndAbortDequeue()
{
- Enqueue enqueue(this);
- abort(enqueue, false);
+ Dequeue dequeue(this);
+ abort(dequeue, true);
}
- void testCommitDequeue()
+ void testPrepareAndAbortSwap()
{
- Dequeue dequeue(this);
- commit(dequeue);
+ Swap swap(this, "SwapMessageId");
+ abort(swap, true);
}
- void testPrepareAndAbortDequeue()
+ void testAbortNoPrepareEnqueue()
{
- Dequeue dequeue(this);
- abort(dequeue, true);
+ Enqueue enqueue(this);
+ abort(enqueue, false);
}
void testAbortNoPrepareDequeue()
@@ -348,6 +345,12 @@
abort(dequeue, false);
}
+ void testAbortNoPrepareSwap()
+ {
+ Swap swap(this, "SwapMessageId");
+ abort(swap, false);
+ }
+
void testRecoverPreparedThenCommitted()
{
recoverPrepared(true);
@@ -363,73 +366,73 @@
// === Test suite ===
-QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+QPID_AUTO_TEST_CASE(CommitEnqueue)
{
- cout << test_filename << ".PrepareAndAbortSwap: " <<
flush;
- tpct.testPrepareAndAbortSwap();
+ cout << test_filename << ".CommitEnqueue: " << flush;
+ tpct.testCommitEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitEnqueue)
+QPID_AUTO_TEST_CASE(CommitDequeue)
{
- cout << test_filename << ".CommitEnqueue: " << flush;
- tpct.testCommitEnqueue();
+ cout << test_filename << ".CommitDequeue: " << flush;
+ tpct.testCommitDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+QPID_AUTO_TEST_CASE(CommitSwap)
{
- cout << test_filename << ".AbortNoPrepareEnqueue: " <<
flush;
- tpct.testAbortNoPrepareEnqueue();
+ cout << test_filename << ".CommitSwap: " << flush;
+ tpct.testCommitSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
{
- cout << test_filename << ".PrepareAndAbortDequeue: " <<
flush;
- tpct.testPrepareAndAbortDequeue();
+ cout << test_filename << ".PrepareAndAbortEnqueue: " <<
flush;
+ tpct.testPrepareAndAbortEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
{
- cout << test_filename << ".RecoverPreparedThenCommitted: "
<< flush;
- tpct.testRecoverPreparedThenCommitted();
+ cout << test_filename << ".PrepareAndAbortDequeue: " <<
flush;
+ tpct.testPrepareAndAbortDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitSwap)
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
{
- cout << test_filename << ".CommitSwap: " << flush;
- tpct.testCommitSwap();
+ cout << test_filename << ".PrepareAndAbortSwap: " <<
flush;
+ tpct.testPrepareAndAbortSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
{
- cout << test_filename << ".AbortNoPrepareSwap: " <<
flush;
- tpct.testAbortNoPrepareSwap();
+ cout << test_filename << ".AbortNoPrepareEnqueue: " <<
flush;
+ tpct.testAbortNoPrepareEnqueue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
{
- cout << test_filename << ".PrepareAndAbortEnqueue: " <<
flush;
- tpct.testPrepareAndAbortEnqueue();
+ cout << test_filename << ".AbortNoPrepareDequeue: " <<
flush;
+ tpct.testAbortNoPrepareDequeue();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(CommitDequeue)
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
{
- cout << test_filename << ".CommitDequeue: " << flush;
- tpct.testCommitDequeue();
+ cout << test_filename << ".AbortNoPrepareSwap: " <<
flush;
+ tpct.testAbortNoPrepareSwap();
cout << "ok" << endl;
}
-QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
{
- cout << test_filename << ".AbortNoPrepareDequeue: " <<
flush;
- tpct.testAbortNoPrepareDequeue();
+ cout << test_filename << ".RecoverPreparedThenCommitted: "
<< flush;
+ tpct.testRecoverPreparedThenCommitted();
cout << "ok" << endl;
}
Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -144,25 +144,23 @@
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
BOOST_CHECK_EQUAL(jc.is_ready(), false);
BOOST_CHECK_EQUAL(jc.is_read_only(), false);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(jc.is_ready(), true);
BOOST_CHECK_EQUAL(jc.is_read_only(), true);
BOOST_CHECK_EQUAL(hrid, u_int64_t(0));
@@ -189,11 +187,10 @@
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
}
{
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -210,7 +207,6 @@
try
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
for (int m=0; m<2*NUM_MSGS; m+=2)
@@ -221,7 +217,7 @@
jc.initialize(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS); // First time
only
else
{
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m - 1));
jc.recover_complete();
}
@@ -229,7 +225,7 @@
}
{
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(m));
jc.recover_complete();
deq_msg(jc, m);
@@ -265,11 +261,10 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
Modified: store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_helper_fns.hpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -70,7 +70,7 @@
void initialize(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks)
{ jcntl::initialize(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES,
JRNL_WMGR_DEF_PAGE_SIZE,
0, &aio_wr_callback); }
- void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
vector<string>& txn_list,
+ void recover(const u_int16_t num_jfiles, const u_int32_t jfsize_sblks,
vector<string>* txn_list,
u_int64_t& highest_rid)
{ jcntl::recover(num_jfiles, jfsize_sblks, JRNL_WMGR_DEF_PAGES,
JRNL_WMGR_DEF_PAGE_SIZE, 0,
&aio_wr_callback, txn_list, highest_rid); }
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -144,7 +144,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -152,7 +151,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
@@ -187,7 +186,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -195,7 +193,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
@@ -209,7 +207,6 @@
}
{
string msg;
- vector<string> txn_list;
u_int64_t hrid;
string rmsg;
string xid;
@@ -217,7 +214,7 @@
bool externalFlag;
test_jrnl jc(test_name, test_dir, test_name);
- jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, txn_list, hrid);
+ jc.recover(NUM_TEST_JFILES, TEST_JFSIZE_SBLKS, 0, hrid);
BOOST_CHECK_EQUAL(hrid, u_int64_t(NUM_MSGS - 1));
for (int m=0; m<NUM_MSGS; m++)
{
Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-06-30 19:03:29 UTC (rev 2172)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp 2008-07-02 20:54:27 UTC (rev 2173)
@@ -110,11 +110,10 @@
{
try
{
- std::vector<std::string> prep_txn_list;
u_int64_t highest_rid;
recover(_jpp->num_jfiles(), _jpp->jfsize_sblks(),
_jpp->wcache_num_pages(),
_jpp->wcache_pgsize_sblks(), aio_rd_callback, aio_wr_callback,
- prep_txn_list, highest_rid);
+ 0, highest_rid);
recover_complete();
}
catch (const rhm::journal::jexception& e)