[rhmessaging-commits] rhmessaging commits: r1430 - in store/trunk/cpp: lib and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Dec 4 22:18:17 EST 2007


Author: kpvdr
Date: 2007-12-04 22:18:17 -0500 (Tue, 04 Dec 2007)
New Revision: 1430

Removed:
   store/trunk/cpp/lib/jrnl/README
Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/jrnl/jcfg.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/lfh.cpp
   store/trunk/cpp/lib/jrnl/lfh.hpp
   store/trunk/cpp/lib/jrnl/nlfh.cpp
   store/trunk/cpp/lib/jrnl/pmgr.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rmgr.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/lib/jrnl/wmgr.hpp
   store/trunk/cpp/rhm.spec.in
   store/trunk/cpp/tests/OrderingTest.cpp
   store/trunk/cpp/tests/SimpleTest.cpp
   store/trunk/cpp/tests/TransactionalTest.cpp
   store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
Log:
Bugfixes, journal now uses parameterized file size and number.

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -60,8 +60,8 @@
                                                         enqueueXidDb(&env, 0), 
                                                         dequeueXidDb(&env, 0), 
                                                         prepareXidDb(&env, 0),
-                                                        numJrnlFiles(8),        // TODO - make param
-                                                        jrnlFsizeSblks(3072),   // TODO - make param
+                                                        numJrnlFiles(8),
+                                                        jrnlFsizePgs(24),
 														isInit(false),
 														envPath(envpath)
 
@@ -70,10 +70,12 @@
 
 }
 
-bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force) 
+bool BdbMessageStore::init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs) 
 { 
 	if (isInit) return true;
 	
+    numJrnlFiles = jfiles;
+    jrnlFsizePgs = jfileSizePgs;
 	useAsync = async;
 	if (dir.size()>0) storeDir = dir;
 
@@ -102,13 +104,45 @@
         txn.abort();
         throw;
     }
-	ret = mode(async, force);
+	ret = mode(useAsync, force);
 	if (!ret) return false;
 
 	isInit = true;
 	return true;
 }
 
+bool BdbMessageStore::init(const qpid::Options* options) 
+{
+    const qpid::broker::Broker::Options* opts = static_cast<const qpid::broker::Broker::Options*>(options);
+    
+    u_int16_t numJrnlFiles = opts->numJrnlFiles;
+    if (numJrnlFiles < JRNL_MIN_NUM_FILES)
+    {
+        numJrnlFiles = JRNL_MIN_NUM_FILES;
+        std::cout << "WARNING: parameter num-jfiles (" << opts->numJrnlFiles << ") below allowable minimum (" << numJrnlFiles << "); changing this parameter to minimum value." << std::endl;
+    }
+    else if (numJrnlFiles > 64)
+    {
+        numJrnlFiles = 64;
+        std::cout << "WARNING: parameter num-jfiles (" << opts->numJrnlFiles << ") above allowable maximum (" << numJrnlFiles << "); changing this parameter to maximum value." << std::endl;
+    }
+
+    u_int32_t jrnlFsizePgs = opts->jrnlFsizePgs;
+    u_int32_t jrnlMinFsizePgs = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
+    if (jrnlFsizePgs < jrnlMinFsizePgs)
+    {
+        jrnlFsizePgs = jrnlMinFsizePgs;
+        std::cout << "WARNING: parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") below allowable minimum (" << jrnlFsizePgs << "); changing this parameter to minimum value." << std::endl;
+    }
+    else if (jrnlFsizePgs > 1024) // (pgs) = 64MiB max file size
+    {
+        jrnlFsizePgs = 1024;
+        std::cout << "WARNING: parameter jfile-size-pgs (" << opts->jrnlFsizePgs << ") above allowable maximum (" << jrnlFsizePgs << "); changing this parameter to maximum value." << std::endl;
+    }
+
+    return init(opts->storeDir, opts->storeAsync, opts->storeForce, numJrnlFiles, jrnlFsizePgs);
+}
+
 // true is async
 bool BdbMessageStore::mode(const bool async, const bool force)
 {
@@ -172,10 +206,10 @@
             (*i)->close(0);
         }
     } catch (const DbException& e) {
-        THROW_STORE_EXCEPTION_2("Error closing databases", e);
+        std::cerr << "Error closing databases: " <<  e.what() << std::endl;
     } catch (const std::exception& e) {
-        throw;
-    }   
+        std::cerr << e.what() << std::endl;
+    } catch (...) {}
 }
 
 void BdbMessageStore::truncate()
@@ -204,7 +238,7 @@
         THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
     }
     if (usingJrnl()) {
-        JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
+        JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
         queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 		try	{
 	     	// init will create the deque's for the init...
@@ -383,7 +417,7 @@
         if (usingJrnl())
         {
 	          const char* queueName = queue->getName().c_str();
-              JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
+              JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizePgs * JRNL_RMGR_PAGE_SIZE, defJournalGetEventsTimeout, defJournalFlushTimeout);
               queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 	
 	          try

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2007-12-05 03:18:17 UTC (rev 1430)
@@ -32,6 +32,7 @@
 #include "PreparedTransaction.h"
 #include "StoreException.h"
 #include "TxnCtxt.h"
+#include <qpid/broker/Broker.h>
 #include <qpid/broker/MessageStore.h>
 #include <qpid/sys/Monitor.h>
 #include <qpid/sys/Time.h>
@@ -47,7 +48,7 @@
 namespace rhm {
     namespace bdbstore {
         using std::string;
-	
+
         /**
          * An implementation of the MessageStore interface based on Berkeley DB
          */
@@ -60,6 +61,12 @@
             typedef LockedMappings::map txn_lock_map;
             typedef boost::ptr_list<PreparedTransaction> txn_list;
 
+            // Default store settings
+            static const bool defUseAsync = false;
+            static const bool defForceStoreConversion = false;
+            static const u_int16_t defNumJrnlFiles = 8;
+            static const u_int32_t defJrnlFileSizePgs = 24;
+
             std::list<Db*> dbs;
             DbEnv env;
             Db queueDb;
@@ -76,8 +83,8 @@
             IdSequence messageIdSequence;
 			static bool useAsync;
 			std::string storeDir;
-            const u_int16_t numJrnlFiles;
-            const u_int32_t jrnlFsizeSblks;
+            u_int16_t numJrnlFiles;
+            u_int32_t jrnlFsizePgs;
 			bool isInit;
 			const char* envPath;
             static qpid::sys::Duration defJournalGetEventsTimeout;
@@ -130,12 +137,15 @@
 	  	  	string getJrnlDir(const char* queueName);
 	  	  	static inline bool usingJrnl() {return useAsync;} 
 	  	  	string getJrnlBaseDir(); 
-			inline void checkInit() { if (!isInit)	init("/var",false, false);	isInit = true;}
+			inline void checkInit() {
+                if (!isInit) init("/var", defUseAsync, defForceStoreConversion, defNumJrnlFiles, defJrnlFileSizePgs); isInit = true;
+            }
 
         public:
             BdbMessageStore(const char* envpath = 0);
-			bool init(const std::string& dir, const bool async, const bool force = false);
             virtual ~BdbMessageStore();
+			bool init(const qpid::Options* options);
+			bool init(const std::string& dir, const bool async, const bool force, u_int16_t jfiles, u_int32_t jfileSizePgs);
 
             void truncate();
 

Deleted: store/trunk/cpp/lib/jrnl/README
===================================================================
--- store/trunk/cpp/lib/jrnl/README	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/README	2007-12-05 03:18:17 UTC (rev 1430)
@@ -1,39 +0,0 @@
-MESSAGE JOURNAL
-===============
-This directory contains the source for the persistent message journal.
-
-External dependencies:
-----------------------
-libaio-dev (run "yum install libaio-dev" as root to install)
-doxygen (required to generate documentation)
-
-TODO: *** THIS DOC IS OUT OF DATE ****
-
-Building:
----------
-For manual building, use the makefile Makefile.rtest in the tests/jrnl directory.
-Change to this directory to run the makefiles and regression tests:
-"make -f Makefile.rtest clean-all" then "make -f Makefile.rtest" To see all make options, run
-"make -f Makefile.rtest help". ** TODO: Check this as it may change **
-
-NOTE: This makefile is a manually created file, not a part of automake/autoconf (yet). It does not
-have complete dependency checking, so if a header file is changed, a clean make should be performed
-manually.
-
-Testing:
---------
-The tests are defined in jtest.cpp, and are numbered from 0 to 40 (more or less). To run an
-individual test, run "./jtest <csvfile> <testnum>" where testnum is the requried test number
-whose parameters are defined in the named csv file.
-
-To analyze the journal files (which are written to a new jdata dir)
-
-To run regression tests which go through all the tests repeatedly and analyse the results, run
-"rtest". This can take some time, especially for the valgrind check (the '*'
-symbol).
-
-Directories:
-------------
-Source: cpp/lib/jrnl
-Tests and test utils: cpp/tests/jrnl
-Documentation: ??? TBD

Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -68,6 +68,7 @@
 //#define JRNL_FILE_SIZE          3072        ///< Journal file size in softblocks excl. file_hdr
 //#define JRNL_NUM_FILES          8           ///< Number of journal files
 
+// NOTE: JRNL_RMGR_PAGE_SIZE must be a multiple of JRNL_WMGR_PAGE_SIZE.
 #define JRNL_RMGR_PAGE_SIZE     128         ///< Journal page size in softblocks
 #define JRNL_RMGR_PAGES         16          ///< Number of pages to use in wmgr
 

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -121,7 +121,7 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
-    _rmgr.initialize(rdtoklp, rd_cb);
+    _rmgr.initialize(rdtoklp, rd_cb, 0);
     _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
 
     // Write info file (<basename>.jinf) to disk
@@ -172,8 +172,8 @@
     // constrains read activity (i.e. one can't read what has not yet been written).
     _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.initialize(rdtoklp, rd_cb);
-    _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+    _rmgr.initialize(rdtoklp, rd_cb, _rcvdat._fro);
+    _wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS, _rcvdat._eo);
     
     _readonly_flag = true;
     _init_flag = true;
@@ -188,7 +188,7 @@
         _datafh[i]->reset(&_rcvdat);
     _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
     _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
-    _rmgr.recover_complete();
+    _rmgr.recover_complete(_rcvdat._fro);
     _readonly_flag = false;
 //std::cout << "Journal revovery complete." << std::endl;
 }
@@ -458,6 +458,24 @@
 jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list) throw (jexception)
 {
     jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
+
+    // If the number of files does not tie up with the jinf file from the journal being recovered,
+    // use the jinf data.
+    if (_num_jfiles != ji.num_jfiles())
+    {
+        _num_jfiles = ji.num_jfiles();
+        _rcvdat._enq_cnt_list.resize(_num_jfiles);
+        std::cout << "WARNING: Recovery found " << _num_jfiles <<
+                " files (different from --num-jfiles parameter value)." << std::endl;
+    }
+    if (_jfsize_sblks != ji.jfsize_sblks())
+    {
+        _jfsize_sblks = ji.jfsize_sblks();
+        std::cout << "WARNING: Recovery found file size = " <<
+                (_jfsize_sblks / JRNL_RMGR_PAGE_SIZE) <<
+                " (different from --jfile-size-pgs parameter value)." << std::endl;
+    }
+
     try
     {
         rd._ffid = ji.get_start_file();
@@ -518,7 +536,6 @@
                     if (!jfile_cycle(fid, ifsp, rd, true))
                         return false;
                 }
-//std::cout << " E";
                 if (!er.is_transient()) // Ignore transient msgs
                 {
                     rd._enq_cnt_list[fid]++;
@@ -533,7 +550,6 @@
                     else
                         _emap.insert_fid(h._rid, fid);
                 }
-//else std::cout << "t";
             }
             break;
         case RHM_JDAT_DEQ_MAGIC:
@@ -547,7 +563,6 @@
                     if (!jfile_cycle(fid, ifsp, rd, true))
                         return false;
                 }
-//std::cout << " D";
                 if (dr.xid_size())
                 {
                     // If the enqueue is part of a pending txn, it will not yet be in emap
@@ -567,7 +582,6 @@
                     try
                     {
                         u_int16_t enq_fid = _emap.get_remove_fid(dr.deq_rid());
-//std::cout << enq_fid;
                         rd._enq_cnt_list[enq_fid]--;
                     }
                     catch(const jexception& e)
@@ -588,7 +602,6 @@
                     if (!jfile_cycle(fid, ifsp, rd, true))
                         return false;
                 }
-//std::cout << " A";
                 // Delete this txn from tmap, unlock any locked records in emap
                 ar.get_xid(&xidp);
                 assert(xidp != NULL);
@@ -622,7 +635,6 @@
                     if (!jfile_cycle(fid, ifsp, rd, true))
                         return false;
                 }
-//std::cout << " C";
                 // Delete this txn from tmap, process records into emap
                 cr.get_xid(&xidp);
                 assert(xidp != NULL);
@@ -643,13 +655,11 @@
             break;
         case RHM_JDAT_EMPTY_MAGIC:
             {
-//std::cout << " X";
                 u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
                 ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
             }
         break;
         case 0:
-//std::cout << " 0 ";
             rd._lfid = fid;
             rd._eo = ifsp->tellg();
             return false;
@@ -694,7 +704,7 @@
     {
         std::stringstream ss;
         ss << _jdir.dirname() << "/" << _base_filename << ".";
-        ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+        ss << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
         ifsp->open(ss.str().c_str());
         if (!ifsp->good())
             throw jexception(jerrno::JERR__FILEIO, ss.str().c_str(), "jcntl", "jfile_cycle");

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -131,8 +131,8 @@
         bool _autostop;             ///< Autostop flag - stops journal when overrun occurs
 
         // Journal control structures
-        const u_int16_t _num_jfiles; ///< Number of journal files
-        const u_int32_t _jfsize_sblks; ///< Journal file size in sblks
+        u_int16_t _num_jfiles;      ///< Number of journal files
+        u_int32_t _jfsize_sblks;    ///< Journal file size in sblks
         lfh** _datafh;              ///< Array of pointers to data file handles
         enq_map _emap;              ///< Enqueue map for low water mark management
         txn_map _tmap;              ///< Transaction map open transactions

Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -43,7 +43,7 @@
 {}
 
 lfh::lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
-        rcvdat const * const ro) throw (jexception*):
+        rcvdat const * const ro) throw (jexception):
         nlfh(fbasename, fid, jfsize_sblks, ro)
 {}
 

Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -57,7 +57,7 @@
     public:
         lfh(const u_int32_t jfsize_sblks);
         lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
-                rcvdat const * const ro) throw (jexception*);
+                rcvdat const * const ro) throw (jexception);
         virtual ~lfh();
     };
 

Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -114,8 +114,8 @@
             {
                 if (ro->_ffid == _fid)
                 {
-                    _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
-                    _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;   
+                    _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
+                    _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
                 }
                 if (ro->_lfid == _fid)
                 {
@@ -196,8 +196,8 @@
         {
             if (ro->_ffid == _fid)
             {
-                _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
-                _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;   
+                _rd_subm_cnt_dblks = JRNL_SBLK_SIZE;
+                _rd_cmpl_cnt_dblks = JRNL_SBLK_SIZE;
             }
             if (ro->_lfid == _fid)
             {
@@ -211,7 +211,6 @@
             }
             _rec_enqcnt = ro->_enq_cnt_list[_fid];
             return true;
-            //return _fid == ro->_ffid ? ro->_full : true;
         }
     }
 #ifndef RHM_WRONLY

Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -150,7 +150,6 @@
 
     protected:
         virtual void initialize() throw (jexception);
-//         virtual bool rotate_page(page_state state = UNUSED);
         virtual void rotate_page() = 0;
         virtual void clean();
     };

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -94,7 +94,8 @@
                 std::cout << "  Journal full (_full) = " << (_full ? "TRUE" : "FALSE") << std::endl;
                 std::cout << "  Enqueued records (txn & non-txn):" << std::endl;
                 for (unsigned i=0; i<_enq_cnt_list.size(); i++)
-                    std::cout << "    File " << i << ": " << _enq_cnt_list[i] << std::endl;
+                    std::cout << "    File " << std::setw(2) << i << ": " << _enq_cnt_list[i] <<
+                            std::endl;
             }
         };
 }

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -60,11 +60,14 @@
 {}
 
 void
-rmgr::initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb) throw (jexception)
+rmgr::initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
+        throw (jexception)
 {
     _dtoklp = dtoklp;
     _cb = rd_cb;
     initialize();
+    if (fro)
+        _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
 }
 
 const iores
@@ -216,8 +219,6 @@
 rmgr::read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize, bool& transient,
         bool& external, data_tok* dtokp) throw (jexception)
 {
-//std::cout << " rmgr::read() ro=" << (_jc->is_read_only()?"T":"F") << " po=" << _pg_offset_dblks << " ems=" << _emap.size() << " tms=" << _tmap.size() << std::flush;
-
     iores res = pre_read_check(dtokp);
     if (res != RHM_IORES_SUCCESS)
     {
@@ -225,7 +226,6 @@
         return res;
     }
 
-//std::cout << " c" << std::flush;
     if (dtokp->rstate() == data_tok::SKIP_PART)
     {
         const iores res = skip(dtokp);
@@ -235,7 +235,6 @@
             return res;
         }
     }
-//std::cout << " d" << std::flush;
     if (dtokp->rstate() == data_tok::READ_PART)
     {
         assert(dtokp->rid() == _hdr._rid);
@@ -247,26 +246,20 @@
         external = _enq_rec.is_external();
         return res;
     }
-//std::cout << " e" << std::flush;
     _hdr.reset();
     // Read header, determine next record type
     while (true)
     {
-//std::string s;
-//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " o=" << (_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
         if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
         {
             aio_cycle();   // check if any AIOs have returned
             return RHM_IORES_EMPTY;
         }
-//std::cout << " g" << std::flush;
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
-//std::cout << "[" << _pg_index << "]=" << page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
             aio_cycle();
             return RHM_IORES_AIO_WAIT;
         }
-//std::cout << " h:" << _pg_index << "-" << _pg_offset_dblks << std::flush;
         void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
                 (_pg_offset_dblks * JRNL_DBLK_SIZE));
         ::memcpy(&_hdr, rptr, sizeof(hdr));
@@ -274,7 +267,6 @@
         {
             case RHM_JDAT_ENQ_MAGIC:
             {
-//std::cout << " E" << std::flush;
                 _enq_rec.reset(); // sets enqueue rec size
                 // Check if RID of this rec is still enqueued, if so read it, else skip
 #ifdef RHM_RDONLY
@@ -284,10 +276,7 @@
                 bool is_enq = false;
                 try
                 {
-//std::cout << " rid=0x" << std::hex << _hdr._rid << std::dec << std::flush;
-//std::cout << " rid=" << _hdr._rid << std::flush;
                     fid = _emap.get_fid(_hdr._rid);
-//std::cout << ":ok" << std::flush;
                     is_enq = true;
                 }
                 catch (const jexception& e)
@@ -295,7 +284,6 @@
                     // Block read for transactionally locked record (only when not recovering)
                     if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
                         throw;
-//if (e.err_code() == jerrno::JERR_MAP_LOCKED) std::cout << ":locked" << std::flush;
 						
 					// (Recover mode only) Ok, not in emap - now search tmap, if present then read
 					if (_jc->is_read_only())
@@ -305,27 +293,21 @@
 						for (std::vector<std::string>::iterator itr = xid_list.begin();
                                 itr != xid_list.end() && !is_enq; itr++)
 						{
-//std::cout << "[xid=" << *itr << ":" << std::flush;
 							txn_data_list tx_list = _tmap.get_tdata_list(*itr);
 							for (tdl_itr ditr = tx_list.begin(); ditr != tx_list.end() && !is_enq;
                                     ditr++)
 							{
-//std::cout << (ditr->_enq_flag?"enq":"deq") << ",rid=" << ditr->_rid << std::flush;
-//if (!ditr->_enq_flag) std::cout << ",drid=" << ditr->_drid << std::flush;
                                 if (ditr->_enq_flag)
                                     is_enq = ditr->_rid == _hdr._rid;
                                 else
                                     is_enq = ditr->_drid == _hdr._rid;
 							}
-//std::cout << "]" << std::flush;
 						}
                     }
                 }
 #endif
-//std::cout << (is_enq?":enq":":not-enq") << std::flush;
                 if (is_enq) // ok, this record is enqueued, check it, then read it...
                 {
-//std::cout << "e" << std::flush;
 #if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
                     // Is this locked by a pending dequeue transaction?
                     try
@@ -376,29 +358,22 @@
                     return res;
                 }
                 else // skip this record, it is already dequeued
-//{ std::cout << "d" << std::flush;
                     consume_xid_rec(_hdr, rptr, dtokp);
-//}
                 break;
             }
             case RHM_JDAT_DEQ_MAGIC:
-//std::cout << " D" << std::flush;
                 consume_xid_rec(_hdr, rptr, dtokp);
                 break;
             case RHM_JDAT_TXA_MAGIC:
-//std::cout << " A" << std::flush;
                 consume_xid_rec(_hdr, rptr, dtokp);
                 break;
             case RHM_JDAT_TXC_MAGIC:
-//std::cout << " C" << std::flush;
                 consume_xid_rec(_hdr, rptr, dtokp);
                 break;
             case RHM_JDAT_EMPTY_MAGIC:
-//std::cout << " X" << std::flush;
                 consume_filler();
                 break;
             default:
-//std::cout << " ?" << std::flush;
                 std::stringstream ss;
                 ss << std::hex << std::setfill('0');
                 ss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
@@ -437,8 +412,10 @@
         {
             std::stringstream ss;
             ss << "AIO read operation failed: " << strerror(-aioret) << " (" << aioret << ")";
-            ss << " [pg=" << pcbp->_index << " size=" << iocbp->u.c.nbytes;
-            ss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
+            ss << " [pg=" << pcbp->_index << " buf=" << iocbp->u.c.buf;
+            ss << " rsize=0x" << std::hex << iocbp->u.c.nbytes;
+            ss << " offset=0x" << iocbp->u.c.offset << std::dec;
+            ss << " fh=" << iocbp->aio_fildes << "]";
             throw jexception(jerrno::JERR__AIO, ss.str().c_str(), "rmgr", "get_events");
         }
 
@@ -453,7 +430,6 @@
         // Use stored pointer to nlfh in the pcb instead.
         pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
         pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
-//std::cout << "{r*" << pcbp->_index << " fid=" << pcbp->_rfh->fid() << "}" << std::flush;
 
         // Clean up this pcb's data_tok list
         pcbp->_pdtokl->clear();
@@ -468,11 +444,11 @@
 }
 
 void
-rmgr::recover_complete()
+rmgr::recover_complete(size_t fro)
 {
     _pg_index = 0;
     _pg_cntr = 0;
-    _pg_offset_dblks = 0;
+    _pg_offset_dblks = (fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE;
 }
 
 void
@@ -517,11 +493,9 @@
 {
     if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
     {
-//std::cout << " S=" << page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
         aio_cycle();   // check if any AIOs have returned
         return RHM_IORES_AIO_WAIT;
     }
-//std::cout << " T" << dtokp->dblocks_read() << "," << dblks_rem() << std::flush;
 
     // Read data from this page, first block will have header and data size.
     u_int32_t dblks_rd = _enq_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
@@ -532,7 +506,6 @@
     // If data still incomplete, move to next page and decode again
     while (dtokp->dblocks_read() < _enq_rec.rec_size_dblks())
     {
-//std::cout << " U" << std::flush;
         rotate_page();
         if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
         {
@@ -549,9 +522,7 @@
 
     // If we have finished with this page, rotate it
     if (dblks_rem() == 0)
-//{std::cout << " V" << std::flush;
         rotate_page();
-//}    
 
     // Set the record size in dtokp
     dtokp->set_rstate(data_tok::READ);
@@ -610,19 +581,13 @@
 {
     u_int32_t dsize_dblks = jrec::size_dblks(dtokp->dsize());
     u_int32_t tot_dblk_cnt = dtokp->dblocks_read();
-//std::cout << " [S " << dsize_dblks << "," << tot_dblk_cnt << std::flush;
     while (true)
     {
         u_int32_t this_dblk_cnt = 0;
         if (dsize_dblks - tot_dblk_cnt > dblks_rem())
-//{std::cout << " x" << std::flush;
             this_dblk_cnt = dblks_rem();
-//}
         else
-//{std::cout << " f" << std::flush;
             this_dblk_cnt = dsize_dblks - tot_dblk_cnt;
-//}
-//std::cout << "->" << this_dblk_cnt << std::flush;
         if (this_dblk_cnt)
         {
             dtokp->incr_dblocks_read(this_dblk_cnt);
@@ -633,16 +598,12 @@
         if (tot_dblk_cnt < dsize_dblks)
         {
             if (_pg_offset_dblks == JRNL_SBLK_SIZE * JRNL_RMGR_PAGE_SIZE)
-//{ std::cout << " rot" << std::flush;
                 rotate_page();
-//}            
             if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
             {
                 dtokp->set_rstate(data_tok::SKIP_PART);
-//std::cout << " w]" << std::flush;
                 return RHM_IORES_AIO_WAIT;
             }
-//std::cout << " !" << std::flush;
         }
         else
         {
@@ -650,7 +611,6 @@
             dtokp->set_rstate(data_tok::UNREAD);
             dtokp->set_dsize(0);
             dtokp->set_dblocks_read(0);
-//std::cout << "]" << std::flush;
 
             // If we have finished with this page, rotate it
             if (dblks_rem() == 0)
@@ -728,7 +688,6 @@
             _aio_evt_rem++;
             _page_cb_arr[pi]._state = AIO_PENDING;
             _page_cb_arr[pi]._rfh = _rrfc.file_handle();
-//std::cout << "{r^" << pi << "," << rd_size << "}" << std::flush;
         }
         else // If there is nothing to read for this page, neither will there be for the others...
             break;
@@ -752,7 +711,6 @@
 rmgr::rotate_page()
 {
     _page_cb_arr[_pg_index]._rdblks = 0;
-//     pmgr::rotate_page();
     _page_cb_arr[_pg_index]._state = UNUSED;
     if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
     {

Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -67,14 +67,15 @@
                 std::deque<data_tok*>* const dtokl) throw (jexception);
         virtual ~rmgr();
 
-        void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb) throw (jexception);
+        void initialize(std::deque<data_tok*>* const dtoklp, const aio_cb rd_cb, size_t fro)
+                throw (jexception);
         const iores get(const u_int64_t& rid, const size_t& dsize, const size_t& dsize_avail,
                 const void** const data, bool auto_discard) throw (jexception);
         const iores discard(data_tok* dtok) throw (jexception);
         const iores read(void** const datapp, size_t& dsize, void** const xidpp, size_t& xidsize,
                 bool& transient, bool& external, data_tok* dtokp) throw (jexception);
         const u_int32_t get_events(page_state state = AIO_COMPLETE) throw (jexception);
-        void recover_complete();
+        void recover_complete(size_t fro);
 
     private:
         void initialize() throw (jexception);

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -59,7 +59,9 @@
         _abort_busy(false),
         _commit_busy(false),
         _txn_pending_set()
-{}
+{
+    assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
+}
 
 wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc, std::deque<data_tok*>* const dtoklp,
         const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception):
@@ -78,7 +80,9 @@
         _abort_busy(false),
         _commit_busy(false),
         _txn_pending_set()
-{}
+{
+    assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
+}
 
 wmgr::~wmgr()
 {
@@ -87,13 +91,20 @@
 
 void
 wmgr::initialize(std::deque<data_tok*>* dtoklp, const aio_cb wr_cb, const u_int32_t max_dtokpp,
-        const u_int32_t max_iowait_us) throw (jexception)
+        const u_int32_t max_iowait_us, size_t eo) throw (jexception)
 {
     _dtoklp = dtoklp;
     _max_dtokpp = max_dtokpp;
     _max_io_wait_us = max_iowait_us;
     _cb = wr_cb;
     initialize();
+    if (eo)
+    {
+        const u_int32_t wr_pg_size_dblks = JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE;
+        u_int32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
+        _pg_cntr = data_dblks / wr_pg_size_dblks;
+        _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
+    }
 }
 
 const iores
@@ -101,7 +112,6 @@
         data_tok* dtokp, const void* const xid_ptr, const size_t xid_len, const bool transient,
         const bool external) throw (jexception)
 {
-//std::cout << "wmgr::enqueue() dl=" << tot_data_len << " xl=" << xid_len << " t=" << (transient?"T":"F") << " e=" << (external?"T":"F") << " msg=" << (data_buff?std::string((const char*)data_buff, tot_data_len):"<null>") << std::endl;
     if (xid_len)
         assert(xid_ptr != NULL);
 
@@ -727,10 +737,8 @@
                 throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
             _wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
             _aio_evt_rem++;
-//std::cout << "{w^" << _pg_index << "," << _cached_offset_dblks << "}" << std::flush;
             _cached_offset_dblks = 0;
 
-//           rotate_page(AIO_PENDING); // increments _pg_index, resets _pg_offset_dblks if req'd
            rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd
            if (_page_cb_arr[_pg_index]._state == UNUSED)
                _page_cb_arr[_pg_index]._state = IN_USE;
@@ -787,7 +795,6 @@
         }
         if (pcbp) // Page writes have pcb
         {
-//std::cout << "{w*" << pcbp->_index << "," << pcbp->_wdblks << "}" << std::flush;
             u_int32_t s = pcbp->_pdtokl->size();
             for (u_int32_t k=0; k<s; k++)
             {
@@ -1018,7 +1025,6 @@
     if (::io_submit(_ioctx, 1, &iocbp) < 0)
         throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
     _aio_evt_rem++;
-//std::cout << "{f^" << fid << "," << JRNL_SBLK_SIZE << "," << iocbp << "}" << std::flush;
     _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
 }
 

Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -104,7 +104,8 @@
         virtual ~wmgr();
 
         void initialize(std::deque<data_tok*>* const dtoklp, aio_cb wr_cb,
-                const u_int32_t max_dtokpp, const u_int32_t max_iowait_us) throw (jexception);
+                const u_int32_t max_dtokpp, const u_int32_t max_iowait_us, size_t eo = 0)
+                throw (jexception);
         const iores enqueue(const void* const data_buff, const size_t tot_data_len,
                 const size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
                 const size_t xid_len, const bool transient, const bool external) throw (jexception);

Modified: store/trunk/cpp/rhm.spec.in
===================================================================
--- store/trunk/cpp/rhm.spec.in	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/rhm.spec.in	2007-12-05 03:18:17 UTC (rev 1430)
@@ -3,7 +3,7 @@
 #
 Name:           rhm
 Version:        @VERSION@
-Release:        2%{?dist}
+Release:        3%{?dist}
 Summary:        Red Hat extensions to the Qpid messaging system
 Group:          System Environment/Libraries
 License:        LGPL

Modified: store/trunk/cpp/tests/OrderingTest.cpp
===================================================================
--- store/trunk/cpp/tests/OrderingTest.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/OrderingTest.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -104,7 +104,7 @@
     void setup(bool async)
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async,true);
+        store->init("/var", async, true, 4, 1);
 		store->truncate();
 
         queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -146,7 +146,7 @@
         store.reset();
 
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var", async, false, 4, 1);
         ExchangeRegistry exchanges;
         DtxManager mgr(store.get());
         RecoveryManagerImpl recoveryMgr(queues, exchanges, mgr, 0);

Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/SimpleTest.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -116,7 +116,7 @@
     void testEmptyRecover(bool async)
     {
         BdbMessageStore store;
-        store.init("/var",async);
+        store.init("/var", async, false, 4, 1);
         store.truncate();//make sure it is empty to begin with
         QueueRegistry registry(&store);
         recover(store, registry);
@@ -129,7 +129,7 @@
     void testCreateDelete(bool async)
     {
         BdbMessageStore store;
-        store.init("/var",async, true);
+        store.init("/var", async, true, 4, 1);
         store.truncate();//make sure it is empty to begin with
         string name("CreateDeleteQueue");
         Queue queue(name, 0, &store, 0);
@@ -150,7 +150,7 @@
         string name("MyDurableQueue");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Queue queue(name, 0, &store, 0);
             store.create(queue);
@@ -159,7 +159,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
@@ -177,7 +177,7 @@
         string name("MyDurableQueue");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Queue queue(name, 0, &store, 0);
             FieldTable settings;
@@ -187,7 +187,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
@@ -205,7 +205,7 @@
         string name("MyDurableQueue");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Queue queue(name, 0, &store, 0);
             store.create(queue);
@@ -213,7 +213,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             recover(store, registry);
             CPPUNIT_ASSERT(!registry.find(name));
@@ -236,7 +236,7 @@
         string data2("hijklmn");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Queue queue(name, 0, &store, 0);
             FieldTable settings;
@@ -255,7 +255,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
@@ -293,7 +293,7 @@
             string messageId = "MyMessage";
             string data("abcdefg");
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Queue queue(name, 0, &store, 0);
             FieldTable settings;
@@ -308,7 +308,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             recover(store, registry);
             Queue::shared_ptr queue = registry.find(name);
@@ -329,7 +329,7 @@
         const string data2("ABCDEFGHIJKLMNOPQRSTUVWXYZ");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
 
             //create & stage a message
@@ -376,7 +376,7 @@
         {
             //recover
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             QueueRegistry registry(&store);
             ExchangeRegistry exchanges;
             DtxManager dtx(&store);
@@ -423,7 +423,7 @@
     void testDestroyStagedMessage(bool async)
     {
         BdbMessageStore store;
-        store.init("/var",async);
+        store.init("/var", async, false, 4, 1);
         store.truncate();//make sure it is empty to begin with
         
         const string data("abcdefg");
@@ -449,7 +449,7 @@
     void testDestroyEnqueuedMessage(bool async)
     {
         BdbMessageStore store;
-        store.init("/var",async);
+        store.init("/var", async, false, 4, 1);
         store.truncate();//make sure it is empty to begin with
         
         const string data("abcdefg");
@@ -484,7 +484,7 @@
         args.setString("a", "A");
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             ExchangeRegistry registry;
             Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -494,7 +494,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry registry;
 
             recover(store, registry);
@@ -508,7 +508,7 @@
         }
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry registry;
 
             recover(store, registry);
@@ -532,7 +532,7 @@
         FieldTable args;
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
             Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -543,7 +543,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry exchanges;
             QueueRegistry queues;
 
@@ -557,7 +557,7 @@
         }
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry exchanges;
             QueueRegistry queues;
 
@@ -581,7 +581,7 @@
         FieldTable args;
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             store.truncate();//make sure it is empty to begin with
             Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
             Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -596,7 +596,7 @@
         }//db will be closed
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry exchanges;
             QueueRegistry queues;
 
@@ -612,7 +612,7 @@
         }
         {
             BdbMessageStore store;
-            store.init("/var",async);
+            store.init("/var", async, false, 4, 1);
             ExchangeRegistry exchanges;
             QueueRegistry queues;
 

Modified: store/trunk/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/trunk/cpp/tests/TransactionalTest.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/TransactionalTest.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -95,7 +95,7 @@
     void setup(bool async)
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async, true);
+        store->init("/var", async, true, 4, 1);
         store->truncate();
 
         //create two queues:
@@ -119,7 +119,7 @@
         store.reset();
 
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var", async, false, 4, 1);
         ExchangeRegistry exchanges;
         DtxManager mgr(store.get());
         RecoveryManagerImpl recovery(queues, exchanges, mgr, 0);

Modified: store/trunk/cpp/tests/TwoPhaseCommitTest.cpp
===================================================================
--- store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-12-04 21:47:02 UTC (rev 1429)
+++ store/trunk/cpp/tests/TwoPhaseCommitTest.cpp	2007-12-05 03:18:17 UTC (rev 1430)
@@ -336,7 +336,7 @@
     void setup()
     {
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async,true);
+        store->init("/var", async, true, 4, 1);
         store->truncate();
 
         //create two queues:
@@ -361,7 +361,7 @@
         store.reset();
 
         store = std::auto_ptr<BdbMessageStore>(new BdbMessageStore());
-        store->init("/var",async);
+        store->init("/var", async, false, 4, 1);
         ExchangeRegistry exchanges;
         dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(store.get()));
         RecoveryManagerImpl recovery(queues, exchanges, *dtxmgr, 0);




More information about the rhmessaging-commits mailing list