[rhmessaging-commits] rhmessaging commits: r1430 - in store/trunk/cpp: lib and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Dec 4 22:18:17 EST 2007
Author: kpvdr
Date: 2007-12-04 22:18:17 -0500 (Tue, 04 Dec 2007)
New Revision: 1430
Removed:
store/trunk/cpp/lib/jrnl/README
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/lfh.cpp
store/trunk/cpp/lib/jrnl/lfh.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/rhm.spec.in
store/trunk/cpp/tests/OrderingTest.cpp
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/TransactionalTest.cpp
store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Bugfixes, journal now uses parameterized file size and number.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -60,8 +60,8 @@
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
prepareXidDb(&env, 0),
- numJrnlFiles(8), // TODO - make param
- jrnlFsizeSblks(3072), // TODO - make param
+ numJrnlFiles(8),
+ jrnlFsizePgs(24),
isInit(false),
envPath(envpath)
@@ -70,10 +70,12 @@
}
-bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force)
+bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs)
{
if (isInit) return true;
+ numJrnlFiles = jfiles;
+ jrnlFsizePgs = jfileSizePgs;
useAsync = async;
if (dir.size()>0) storeDir = dir;
@@ -102,13 +104,45 @@
txn.abort();
throw;
}
- ret = mode(async, force);
+ ret = mode(useAsync, force);
if (!ret) return false;
isInit = true;
return true;
}
+bool BdbMessageStore::init(const qpid::Options* options)
+{
+ const qpid::broker::Broker::Options* opts = static_cast<const qpid::broker::Broker::Options*>(options);
+
+ u_int16_t numJrnlFiles = opts->numJrnlFiles;
+ if (numJrnlFiles < JRNL_MIN_NUM_FILES)
+ {
+ numJrnlFiles = JRNL_MIN_NUM_FILES;
+ std::cout << "WARNING: parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value." << std::endl;
+ }
+ else if (numJrnlFiles > 64)
+ {
+ numJrnlFiles = 64;
+ std::cout << "WARNING: parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value." << std::endl;
+ }
+
+ u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
+ u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+ if (jrnlFsizePgs < jrnlMinFsizePgs)
+ {
+ jrnlFsizePgs = jrnlMinFsizePgs;
+ std::cout << "WARNING: parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value." << std::endl;
+ }
+ else if (jrnlFsizePgs > 1024) // (pgs) = 64MiB max file size
+ {
+ jrnlFsizePgs = 1024;
+ std::cout << "WARNING: parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value." << std::endl;
+ }
+
+ return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs);
+}
+
// true is async
bool BdbMessageStore::mode(const bool async, const bool force)
{
@@ -172,10 +206,10 @@
(*i)->close(0);
}
} catch (const DbException& e) {
- THROW_STORE_EXCEPTION_2("Error closing databases", e);
+ std::cerr << "Error closing databases: " << e.what() << std::endl;
} catch (const std::exception& e) {
- throw;
- }
+ std::cerr << e.what() << std::endl;
+ } catch (...) {}
}
void BdbMessageStore::truncate()
@@ -204,7 +238,7 @@
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
- JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
@@ -383,7 +417,7 @@
if (usingJrnl())
{
const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-05 03:18:17 UTC (rev 1430)
@@ -32,6 +32,7 @@
#include "PreparedTransaction.h"
#include "StoreException.h"
#include "TxnCtxt.h"
+#include <qpid/broker/Broker.h>
#include <qpid/broker/MessageStore.h>
#include <qpid/sys/Monitor.h>
#include <qpid/sys/Time.h>
@@ -47,7 +48,7 @@
namespace rhm {
namespace bdbstore {
using std::string;
-
+
/**
* An implementation of the MessageStore interface based on Berkeley DB
*/
@@ -60,6 +61,12 @@
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
+ // Default store settings
+ static const bool defUseAsync = false;
+ static const bool defForceStoreConversion = false;
+ static const u_int16_t defNumJrnlFiles = 8;
+ static const u_int32_t defJrnlFileSizePgs = 24;
+
std::list<Db*> dbs;
DbEnv env;
Db queueDb;
@@ -76,8 +83,8 @@
IdSequence messageIdSequence;
static bool useAsync;
std::string storeDir;
- const u_int16_t numJrnlFiles;
- const u_int32_t jrnlFsizeSblks;
+ u_int16_t numJrnlFiles;
+ u_int32_t jrnlFsizePgs;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -130,12 +137,15 @@
string getJrnlDir(const char* queueName);
static inline bool usingJrnl() {return useAsync;}
string getJrnlBaseDir();
- inline void checkInit() { if (!isInit) init("/var",false, false); isInit = true;}
+ inline void checkInit() {
+ if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs); isInit = true;
+ }
public:
BdbMessageStore(const char* envpath = 0);
- bool init(const std::string& dir, const bool async, const bool force = false);
virtual ~BdbMessageStore();
+ bool init(const qpid::Options* options);
+ bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs);
void truncate();
Deleted: store/trunk/cpp/lib/jrnl/README
===================================================================
--- store/trunk/cpp/lib/jrnl/README 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/README 2007-12-05 03:18:17 UTC (rev 1430)
@@ -1,39 +0,0 @@
-MESSAGE JOURNAL
-===============
-This directory contains the source for the persistent message journal.
-
-External dependencies:
-----------------------
-libaio-dev (run "yum install libaio-dev" as root to install)
-doxygen (required to generate documentation)
-
-TODO: *** THIS DOC IS OUT OF DATE ****
-
-Building:
----------
-For manual building, use the makefile Makefile.rtest in the tests/jrnl directory.
-Change to this directory to run the makefiles and regression tests:
-"make -f Makefile.rtest clean-all" then "make -f Makefile.rtest" To see all make options, run
-"make -f Makefile.rtest help". ** TODO: Check this as it may change **
-
-NOTE: This makefile is a manually created file, not a part of automake/autoconf (yet). It does not
-have complete dependency checking, so if a header file is changed, a clean make should be performed
-manually.
-
-Testing:
---------
-The tests are defined in jtest.cpp, and are numbered from 0 to 40 (more or less). To run an
-individual test, run "./jtest <csvfile> <testnum>" where testnum is the requried test number
-whose parameters are defined in the named csv file.
-
-To analyze the journal files (which are written to a new jdata dir)
-
-To run regression tests which go through all the tests repeatedly and analyse the results, run
-"rtest". This can take some time, especially for the valgrind check (the '*'
-symbol).
-
-Directories:
-------------
-Source: cpp/lib/jrnl
-Tests and test utils: cpp/tests/jrnl
-Documentation: ??? TBD
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -68,6 +68,7 @@
//#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks excl. file_hdr
//#define JRNL_NUM_FILES 8 ///< Number of journal files
+// NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -121,7 +121,7 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
- _rmgr.initialize(rdtoklp, rd_cb);
+ _rmgr.initialize(rdtoklp, rd_cb, 0);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
// Write info file (<basename>.jinf) to disk
@@ -172,8 +172,8 @@
// constrains read activity (i.e. one can't read what has not yet been written).
_wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
- _rmgr.initialize(rdtoklp, rd_cb);
- _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+ _rmgr.initialize(rdtoklp, rd_cb, _rcvdat._fro);
+ _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
_readonly_flag = true;
_init_flag = true;
@@ -188,7 +188,7 @@
_datafh[i]->reset(&_rcvdat);
_wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
_rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
- _rmgr.recover_complete();
+ _rmgr.recover_complete(_rcvdat._fro);
_readonly_flag = false;
//std::cout << "Journal revovery complete." << std::endl;
}
@@ -458,6 +458,24 @@
jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list) throw (jexception)
{
jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
+
+ // If the number of files does not tie up with the jinf file from the journal being recovered,
+ // use the jinf data.
+ if (_num_jfiles != ji.num_jfiles())
+ {
+ _num_jfiles = ji.num_jfiles();
+ _rcvdat._enq_cnt_list.resize(_num_jfiles);
+ std::cout << "WARNING: Recovery found " << _num_jfiles <<
+ " files (different from --num-jfiles parameter value)." << std::endl;
+ }
+ if (_jfsize_sblks != ji.jfsize_sblks())
+ {
+ _jfsize_sblks = ji.jfsize_sblks();
+ std::cout << "WARNING: Recovery found file size = " <<
+ (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+ " (different from --jfile-size-pgs parameter value)." << std::endl;
+ }
+
try
{
rd._ffid = ji.get_start_file();
@@ -518,7 +536,6 @@
if (!jfile_cycle(fid, ifsp, rd, true))
return false;
}
-//std::cout << " E";
if (!er.is_transient()) // Ignore transient msgs
{
rd._enq_cnt_list[fid]++;
@@ -533,7 +550,6 @@
else
_emap.insert_fid(h._rid, fid);
}
-//else std::cout << "t";
}
break;
case RHM_JDAT_DEQ_MAGIC:
@@ -547,7 +563,6 @@
if (!jfile_cycle(fid, ifsp, rd, true))
return false;
}
-//std::cout << " D";
if (dr.xid_size())
{
// If the enqueue is part of a pending txn, it will not yet be in emap
@@ -567,7 +582,6 @@
try
{
u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
-//std::cout << enq_fid;
rd._enq_cnt_list[enq_fid]--;
}
catch(const jexception& e)
@@ -588,7 +602,6 @@
if (!jfile_cycle(fid, ifsp, rd, true))
return false;
}
-//std::cout << " A";
// Delete this txn from tmap, unlock any locked records in emap
ar.get_xid(&xidp);
assert(xidp != NULL);
@@ -622,7 +635,6 @@
if (!jfile_cycle(fid, ifsp, rd, true))
return false;
}
-//std::cout << " C";
// Delete this txn from tmap, process records into emap
cr.get_xid(&xidp);
assert(xidp != NULL);
@@ -643,13 +655,11 @@
break;
case RHM_JDAT_EMPTY_MAGIC:
{
-//std::cout << " X";
u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
}
break;
case 0:
-//std::cout << " 0 ";
rd._lfid = fid;
rd._eo = ifsp->tellg();
return false;
@@ -694,7 +704,7 @@
{
std::stringstream ss;
ss << _jdir.dirname() << "/" << _base_filename << ".";
- ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
ifsp->open(ss.str().c_str());
if (!ifsp->good())
throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl", "jfile_cycle");
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -131,8 +131,8 @@
bool _autostop; ///< Autostop flag - stops journal when overrun occurs
// Journal control structures
- const u_int16_t _num_jfiles; ///< Number of journal files
- const u_int32_t _jfsize_sblks; ///< Journal file size in sblks
+ u_int16_t _num_jfiles; ///< Number of journal files
+ u_int32_t _jfsize_sblks; ///< Journal file size in sblks
lfh** _datafh; ///< Array of pointers to data file handles
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -43,7 +43,7 @@
{}
lfh::lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- rcvdat const * const ro) throw (jexception*):
+ rcvdat const * const ro) throw (jexception):
nlfh(fbasename, fid, jfsize_sblks, ro)
{}
Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -57,7 +57,7 @@
public:
lfh(const u_int32_t jfsize_sblks);
lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
- rcvdat const * const ro) throw (jexception*);
+ rcvdat const * const ro) throw (jexception);
virtual ~lfh();
};
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -114,8 +114,8 @@
{
if (ro->_ffid == _fid)
{
- _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
- _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
+ _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
}
if (ro->_lfid == _fid)
{
@@ -196,8 +196,8 @@
{
if (ro->_ffid == _fid)
{
- _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
- _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
+ _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
}
if (ro->_lfid == _fid)
{
@@ -211,7 +211,6 @@
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
return true;
- //return _fid == ro->_ffid ? ro->_full : true;
}
}
#ifndef RHM_WRONLY
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -150,7 +150,6 @@
protected:
virtual void initialize() throw (jexception);
-// virtual bool rotate_page(page_state state = UNUSED);
virtual void rotate_page() = 0;
virtual void clean();
};
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -94,7 +94,8 @@
std::cout << " Journal full (_full) = " << (_full ? "TRUE" : "FALSE") << std::endl;
std::cout << " Enqueued records (txn & non-txn):" << std::endl;
for (unsigned i=0; i<_enq_cnt_list.size(); i++)
- std::cout << " File " << i << ": " << _enq_cnt_list[i] << std::endl;
+ std::cout << " File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
+ std::endl;
}
};
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -60,11 +60,14 @@
{}
void
-rmgr::initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb) throw (jexception)
+rmgr::initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
+ throw (jexception)
{
_dtoklp = dtoklp;
_cb = rd_cb;
initialize();
+ if (fro)
+ _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
}
const iores
@@ -216,8 +219,6 @@
rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
bool& external, data_tok* dtokp) throw (jexception)
{
-//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << " po=" << _pg_offset_dblks << " ems=" << _emap.size() << " tms=" << _tmap.size() << std::flush;
-
iores res = pre_read_check(dtokp);
if (res != RHM_IORES_SUCCESS)
{
@@ -225,7 +226,6 @@
return res;
}
-//std::cout << " c" << std::flush;
if (dtokp->rstate() == data_tok::SKIP_PART)
{
const iores res = skip(dtokp);
@@ -235,7 +235,6 @@
return res;
}
}
-//std::cout << " d" << std::flush;
if (dtokp->rstate() == data_tok::READ_PART)
{
assert(dtokp->rid() == _hdr._rid);
@@ -247,26 +246,20 @@
external = _enq_rec.is_external();
return res;
}
-//std::cout << " e" << std::flush;
_hdr.reset();
// Read header, determine next record type
while (true)
{
-//std::string s;
-//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " o=" << (_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
}
-//std::cout << " g" << std::flush;
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
-//std::cout << "[" << _pg_index << "]=" << page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
aio_cycle();
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " h:" << _pg_index << "-" << _pg_offset_dblks << std::flush;
void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
(_pg_offset_dblks * JRNL_DBLK_SIZE));
::memcpy(&_hdr, rptr, sizeof(hdr));
@@ -274,7 +267,6 @@
{
case RHM_JDAT_ENQ_MAGIC:
{
-//std::cout << " E" << std::flush;
_enq_rec.reset(); // sets enqueue rec size
// Check if RID of this rec is still enqueued, if so read it, else skip
#ifdef RHM_RDONLY
@@ -284,10 +276,7 @@
bool is_enq = false;
try
{
-//std::cout << " rid=0x" << std::hex << _hdr._rid << std::dec << std::flush;
-//std::cout << " rid=" << _hdr._rid << std::flush;
fid = _emap.get_fid(_hdr._rid);
-//std::cout << ":ok" << std::flush;
is_enq = true;
}
catch (const jexception& e)
@@ -295,7 +284,6 @@
// Block read for transactionally locked record (only when not recovering)
if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
throw;
-//if (e.err_code() == jerrno::JERR_MAP_LOCKED) std::cout << ":locked" << std::flush;
// (Recover mode only) Ok, not in emap - now search tmap, if present then read
if (_jc->is_read_only())
@@ -305,27 +293,21 @@
for (std::vector<std::string>::iterator itr = xid_list.begin();
itr != xid_list.end() && !is_enq; itr++)
{
-//std::cout << "[xid=" << *itr << ":" << std::flush;
txn_data_list tx_list = _tmap.get_tdata_list(*itr);
for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
ditr++)
{
-//std::cout << (ditr->_enq_flag?"enq":"deq") << ",rid=" << ditr->_rid << std::flush;
-//if (!ditr->_enq_flag) std::cout << ",drid=" << ditr->_drid << std::flush;
if (ditr->_enq_flag)
is_enq = ditr->_rid == _hdr._rid;
else
is_enq = ditr->_drid == _hdr._rid;
}
-//std::cout << "]" << std::flush;
}
}
}
#endif
-//std::cout << (is_enq?":enq":":not-enq") << std::flush;
if (is_enq) // ok, this record is enqueued, check it, then read it...
{
-//std::cout << "e" << std::flush;
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
// Is this locked by a pending dequeue transaction?
try
@@ -376,29 +358,22 @@
return res;
}
else // skip this record, it is already dequeued
-//{ std::cout << "d" << std::flush;
consume_xid_rec(_hdr, rptr, dtokp);
-//}
break;
}
case RHM_JDAT_DEQ_MAGIC:
-//std::cout << " D" << std::flush;
consume_xid_rec(_hdr, rptr, dtokp);
break;
case RHM_JDAT_TXA_MAGIC:
-//std::cout << " A" << std::flush;
consume_xid_rec(_hdr, rptr, dtokp);
break;
case RHM_JDAT_TXC_MAGIC:
-//std::cout << " C" << std::flush;
consume_xid_rec(_hdr, rptr, dtokp);
break;
case RHM_JDAT_EMPTY_MAGIC:
-//std::cout << " X" << std::flush;
consume_filler();
break;
default:
-//std::cout << " ?" << std::flush;
std::stringstream ss;
ss << std::hex << std::setfill('0');
ss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
@@ -437,8 +412,10 @@
{
std::stringstream ss;
ss << "AIO read operation failed: " << strerror(-aioret) << " (" << aioret << ")";
- ss << " [pg=" << pcbp->_index << " size=" << iocbp->u.c.nbytes;
- ss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
+ ss << " [pg=" << pcbp->_index << " buf=" << iocbp->u.c.buf;
+ ss << " rsize=0x" << std::hex << iocbp->u.c.nbytes;
+ ss << " offset=0x" << iocbp->u.c.offset << std::dec;
+ ss << " fh=" << iocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, ss.str().c_str(), "rmgr", "get_events");
}
@@ -453,7 +430,6 @@
// Use stored pointer to nlfh in the pcb instead.
pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
-//std::cout << "{r*" << pcbp->_index << " fid=" << pcbp->_rfh->fid() << "}" << std::flush;
// Clean up this pcb's data_tok list
pcbp->_pdtokl->clear();
@@ -468,11 +444,11 @@
}
void
-rmgr::recover_complete()
+rmgr::recover_complete(size_t fro)
{
_pg_index = 0;
_pg_cntr = 0;
- _pg_offset_dblks = 0;
+ _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
}
void
@@ -517,11 +493,9 @@
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
-//std::cout << " S=" << page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " T" << dtokp->dblocks_read() << "," << dblks_rem() << std::flush;
// Read data from this page, first block will have header and data size.
u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
@@ -532,7 +506,6 @@
// If data still incomplete, move to next page and decode again
while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
{
-//std::cout << " U" << std::flush;
rotate_page();
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -549,9 +522,7 @@
// If we have finished with this page, rotate it
if (dblks_rem() == 0)
-//{std::cout << " V" << std::flush;
rotate_page();
-//}
// Set the record size in dtokp
dtokp->set_rstate(data_tok::READ);
@@ -610,19 +581,13 @@
{
u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
-//std::cout << " [S " << dsize_dblks << "," << tot_dblk_cnt << std::flush;
while (true)
{
u_int32_t this_dblk_cnt = 0;
if (dsize_dblks - tot_dblk_cnt > dblks_rem())
-//{std::cout << " x" << std::flush;
this_dblk_cnt = dblks_rem();
-//}
else
-//{std::cout << " f" << std::flush;
this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
-//}
-//std::cout << "->" << this_dblk_cnt << std::flush;
if (this_dblk_cnt)
{
dtokp->incr_dblocks_read(this_dblk_cnt);
@@ -633,16 +598,12 @@
if (tot_dblk_cnt < dsize_dblks)
{
if (_pg_offset_dblks == JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
-//{ std::cout << " rot" << std::flush;
rotate_page();
-//}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
dtokp->set_rstate(data_tok::SKIP_PART);
-//std::cout << " w]" << std::flush;
return RHM_IORES_AIO_WAIT;
}
-//std::cout << " !" << std::flush;
}
else
{
@@ -650,7 +611,6 @@
dtokp->set_rstate(data_tok::UNREAD);
dtokp->set_dsize(0);
dtokp->set_dblocks_read(0);
-//std::cout << "]" << std::flush;
// If we have finished with this page, rotate it
if (dblks_rem() == 0)
@@ -728,7 +688,6 @@
_aio_evt_rem++;
_page_cb_arr[pi]._state = AIO_PENDING;
_page_cb_arr[pi]._rfh = _rrfc.file_handle();
-//std::cout << "{r^" << pi << "," << rd_size << "}" << std::flush;
}
else // If there is nothing to read for this page, neither will there be for the others...
break;
@@ -752,7 +711,6 @@
rmgr::rotate_page()
{
_page_cb_arr[_pg_index]._rdblks = 0;
-// pmgr::rotate_page();
_page_cb_arr[_pg_index]._state = UNUSED;
if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
{
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -67,14 +67,15 @@
std::deque<data_tok*>* const dtokl) throw (jexception);
virtual ~rmgr();
- void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb) throw (jexception);
+ void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
+ throw (jexception);
const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
const void** const data, bool auto_discard) throw (jexception);
const iores discard(data_tok* dtok) throw (jexception);
const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
bool& transient, bool& external, data_tok* dtokp) throw (jexception);
const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
- void recover_complete();
+ void recover_complete(size_t fro);
private:
void initialize() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -59,7 +59,9 @@
_abort_busy(false),
_commit_busy(false),
_txn_pending_set()
-{}
+{
+ assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
+}
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, std::deque<data_tok*>* const dtoklp,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception):
@@ -78,7 +80,9 @@
_abort_busy(false),
_commit_busy(false),
_txn_pending_set()
-{}
+{
+ assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
+}
wmgr::~wmgr()
{
@@ -87,13 +91,20 @@
void
wmgr::initialize(std::deque<data_tok*>* dtoklp, const aio_cb wr_cb, const u_int32_t max_dtokpp,
- const u_int32_t max_iowait_us) throw (jexception)
+ const u_int32_t max_iowait_us, size_t eo) throw (jexception)
{
_dtoklp = dtoklp;
_max_dtokpp = max_dtokpp;
_max_io_wait_us = max_iowait_us;
_cb = wr_cb;
initialize();
+ if (eo)
+ {
+ const u_int32_t wr_pg_size_dblks = JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+ u_int32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
+ _pg_cntr = data_dblks / wr_pg_size_dblks;
+ _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
+ }
}
const iores
@@ -101,7 +112,6 @@
data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool transient,
const bool external) throw (jexception)
{
-//std::cout << "wmgr::enqueue() dl=" << tot_data_len << " xl=" << xid_len << " t=" << (transient?"T":"F") << " e=" << (external?"T":"F") << " msg=" << (data_buff?std::string((const char*)data_buff, tot_data_len):"<null>") << std::endl;
if (xid_len)
assert(xid_ptr != NULL);
@@ -727,10 +737,8 @@
throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
_wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
_aio_evt_rem++;
-//std::cout << "{w^" << _pg_index << "," << _cached_offset_dblks << "}" << std::flush;
_cached_offset_dblks = 0;
-// rotate_page(AIO_PENDING); // increments _pg_index, resets _pg_offset_dblks if req'd
rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
@@ -787,7 +795,6 @@
}
if (pcbp) // Page writes have pcb
{
-//std::cout << "{w*" << pcbp->_index << "," << pcbp->_wdblks << "}" << std::flush;
u_int32_t s = pcbp->_pdtokl->size();
for (u_int32_t k=0; k<s; k++)
{
@@ -1018,7 +1025,6 @@
if (::io_submit(_ioctx, 1, &iocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
_aio_evt_rem++;
-//std::cout << "{f^" << fid << "," << JRNL_SBLK_SIZE << "," << iocbp << "}" << std::flush;
_wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
}
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -104,7 +104,8 @@
virtual ~wmgr();
void initialize(std::deque<data_tok*>* const dtoklp, aio_cb wr_cb,
- const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception);
+ const u_int32_t max_dtokpp, const u_int32_t max_iowait_us, size_t eo = 0)
+ throw (jexception);
const iores enqueue(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const size_t xid_len, const bool transient, const bool external) throw (jexception);
Modified: store/trunk/cpp/rhm.spec.in
===================================================================
--- store/trunk/cpp/rhm.spec.in 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/rhm.spec.in 2007-12-05 03:18:17 UTC (rev 1430)
@@ -3,7 +3,7 @@
#
Name: rhm
Version: @VERSION@
-Release: 2%{?dist}
+Release: 3%{?dist}
Summary: Red Hat extensions to the Qpid messaging system
Group: System Environment/Libraries
License: LGPL
Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/OrderingTest.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -104,7 +104,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async,true);
+ store->init("/var", async, true, 4, 1);
store->truncate();
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -146,7 +146,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
DtxManager mgr(store.get());
RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -116,7 +116,7 @@
void testEmptyRecover(bool async)
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
QueueRegistry registry(&store);
recover(store, registry);
@@ -129,7 +129,7 @@
void testCreateDelete(bool async)
{
BdbMessageStore store;
- store.init("/var",async, true);
+ store.init("/var", async, true, 4, 1);
store.truncate();//make sure it is empty to begin with
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -150,7 +150,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue);
@@ -159,7 +159,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
@@ -177,7 +177,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -187,7 +187,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
@@ -205,7 +205,7 @@
string name("MyDurableQueue");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
store.create(queue);
@@ -213,7 +213,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
recover(store, registry);
CPPUNIT_ASSERT(!registry.find(name));
@@ -236,7 +236,7 @@
string data2("hijklmn");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -255,7 +255,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
@@ -293,7 +293,7 @@
string messageId = "MyMessage";
string data("abcdefg");
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -308,7 +308,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
recover(store, registry);
Queue::shared_ptr queue = registry.find(name);
@@ -329,7 +329,7 @@
const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
//create & stage a message
@@ -376,7 +376,7 @@
{
//recover
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
QueueRegistry registry(&store);
ExchangeRegistry exchanges;
DtxManager dtx(&store);
@@ -423,7 +423,7 @@
void testDestroyStagedMessage(bool async)
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -449,7 +449,7 @@
void testDestroyEnqueuedMessage(bool async)
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
const string data("abcdefg");
@@ -484,7 +484,7 @@
args.setString("a", "A");
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -494,7 +494,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
@@ -508,7 +508,7 @@
}
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry registry;
recover(store, registry);
@@ -532,7 +532,7 @@
FieldTable args;
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -543,7 +543,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -557,7 +557,7 @@
}
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -581,7 +581,7 @@
FieldTable args;
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
store.truncate();//make sure it is empty to begin with
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -596,7 +596,7 @@
}//db will be closed
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -612,7 +612,7 @@
}
{
BdbMessageStore store;
- store.init("/var",async);
+ store.init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/TransactionalTest.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -95,7 +95,7 @@
void setup(bool async)
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async, true);
+ store->init("/var", async, true, 4, 1);
store->truncate();
//create two queues:
@@ -119,7 +119,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
DtxManager mgr(store.get());
RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);
Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp 2007-12-05 03:18:17 UTC (rev 1430)
@@ -336,7 +336,7 @@
void setup()
{
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async,true);
+ store->init("/var", async, true, 4, 1);
store->truncate();
//create two queues:
@@ -361,7 +361,7 @@
store.reset();
store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
- store->init("/var",async);
+ store->init("/var", async, false, 4, 1);
ExchangeRegistry exchanges;
dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(store.get()));
RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);
More information about the rhmessaging-commits
mailing list