Author: kpvdr
Date: 2008-10-14 08:04:52 -0400 (Tue, 14 Oct 2008)
New Revision: 2632
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/lfmgr.cpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rfc.cpp
store/trunk/cpp/lib/jrnl/rfc.hpp
store/trunk/cpp/lib/jrnl/rrfc.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/_st_basic.cpp
Log:
Installed lfmgr into jcntl which replaces local journal file count and fcntl pointer array
variables. The auto-expand functionality has not yet been wired in yet; this is a
functional replacement only.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -144,9 +144,9 @@
if (_mgmtObject != 0)
{
- _mgmtObject->set_initialFileCount(_num_jfiles);
+ _mgmtObject->set_initialFileCount(_lfmgr.size());
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE *
JRNL_DBLK_SIZE);
- _mgmtObject->set_currentFileCount(_num_jfiles);
+ _mgmtObject->set_currentFileCount(_lfmgr.size());
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE *
JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
@@ -172,9 +172,9 @@
if (_mgmtObject != 0)
{
- _mgmtObject->set_initialFileCount(_num_jfiles);
+ _mgmtObject->set_initialFileCount(_lfmgr.size());
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE *
JRNL_DBLK_SIZE);
- _mgmtObject->set_currentFileCount(_num_jfiles);
+ _mgmtObject->set_currentFileCount(_lfmgr.size());
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE *
JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -62,14 +62,12 @@
_stop_flag(false),
_readonly_flag(false),
_autostop(true),
- _num_jfiles(0),
_jfsize_sblks(0),
- _fc_arr(0),
-// _lfmgr(this),
+ _lfmgr(),
_emap(),
_tmap(),
- _rrfc(),
- _wrfc(),
+ _rrfc(&_lfmgr),
+ _wrfc(&_lfmgr),
_rmgr(this, _emap, _tmap, _rrfc),
_wmgr(this, _emap, _tmap, _wrfc),
_rcvdat()
@@ -83,12 +81,7 @@
if (_init_flag && !_stop_flag)
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
- if (_fc_arr)
- {
- for (u_int32_t i=0; i<_num_jfiles; i++)
- delete _fc_arr[i];
- delete[] _fc_arr;
- }
+ _lfmgr.finalize();
::pthread_mutex_destroy(&_gev_mutex);
::pthread_mutex_destroy(&_wr_mutex);
}
@@ -105,20 +98,13 @@
_emap.clear();
_tmap.clear();
- // TODO - place this in a finalize() fn? - see ~jcntl() & recover()...
- if (_fc_arr)
- {
- for (u_int32_t i=0; i<_num_jfiles; i++)
- delete _fc_arr[i];
- delete[] _fc_arr;
- }
+ _lfmgr.finalize();
// Set new file geometry parameters
assert(num_jfiles >= JRNL_MIN_NUM_FILES);
assert(num_jfiles <= JRNL_MAX_NUM_FILES);
- _num_jfiles = num_jfiles;
- _emap.set_num_jfiles(_num_jfiles);
- _tmap.set_num_jfiles(_num_jfiles);
+ _emap.set_num_jfiles(num_jfiles);
+ _tmap.set_num_jfiles(num_jfiles);
assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
@@ -126,21 +112,10 @@
// Clear any existing journal files
_jdir.clear_dir();
+ _lfmgr.initialize(num_jfiles, this, &new_fcntl);
- _fc_arr = new fcntl*[_num_jfiles];
- // Zero the pointer array first because new() can throw exceptions
- for (u_int16_t i=0; i<_num_jfiles; i++)
- _fc_arr[i] = 0;
- for (u_int16_t i=0; i<_num_jfiles; i++)
- {
- std::ostringstream oss;
- oss << _jdir << "/" << _base_filename;
- // TODO: resolve fid/lid in following stmt:
- _fc_arr[i] = new fcntl(oss.str(), i, i, _jfsize_sblks, 0);
- }
-
- _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr);
- _rrfc.initialize(_num_jfiles, _fc_arr);
+ _wrfc.initialize(_jfsize_sblks);
+ _rrfc.initialize();
_rrfc.set_findex(0);
_rmgr.initialize(rd_cb);
_wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
@@ -165,20 +140,13 @@
_emap.clear();
_tmap.clear();
- // TODO - place this in a finalize() fn? - see ~jcntl() & initialize()...
- if (_fc_arr)
- {
- for (u_int32_t i=0; i<_num_jfiles; i++)
- delete _fc_arr[i];
- delete[] _fc_arr;
- }
+ _lfmgr.finalize();
// Set new file geometry parameters
assert(num_jfiles >= JRNL_MIN_NUM_FILES);
assert(num_jfiles <= JRNL_MAX_NUM_FILES);
- _num_jfiles = num_jfiles;
- _emap.set_num_jfiles(_num_jfiles);
- _tmap.set_num_jfiles(_num_jfiles);
+ _emap.set_num_jfiles(num_jfiles);
+ _tmap.set_num_jfiles(num_jfiles);
assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
@@ -186,33 +154,22 @@
// Verify journal dir and journal files
_jdir.verify_dir();
- _rcvdat.reset(_num_jfiles);
+ _rcvdat.reset(num_jfiles);
rcvr_janalyze(_rcvdat, prep_txn_list_ptr);
highest_rid = _rcvdat._h_rid;
if (_rcvdat._jfull)
throw jexception(jerrno::JERR_JCNTL_RECOVERJFULL, "jcntl",
"recover");
-
this->log(LOG_DEBUG, _rcvdat.to_log(_jid));
- _fc_arr = new fcntl*[_num_jfiles];
- // 0 the pointer array first because new() can throw exceptions
- for (u_int16_t i=0; i<_num_jfiles; i++)
- _fc_arr[i] = 0;
- for (u_int16_t i=0; i<_num_jfiles; i++)
- {
- std::ostringstream oss;
- oss << _jdir << "/" << _base_filename;
- // TODO: resolve fid/lid in following stmt:
- _fc_arr[i] = new fcntl(oss.str(), i, i, _jfsize_sblks, &_rcvdat);
- }
+ _lfmgr.recover(_rcvdat, this, &new_fcntl);
- _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr);
+ _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+ _rrfc.initialize();
_rrfc.set_findex(_rcvdat.ffid());
_rmgr.initialize(rd_cb);
- _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
- JRNL_WMGR_MAXWAITUS, (_rcvdat._lffull ? 0 : _rcvdat._eo));
+ _wmgr.initialize(wr_cb, wcache_pgsize_sblks, wcache_num_pages, JRNL_WMGR_MAXDTOKPP,
JRNL_WMGR_MAXWAITUS,
+ (_rcvdat._lffull ? 0 : _rcvdat._eo));
_readonly_flag = true;
_init_flag = true;
@@ -223,10 +180,10 @@
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recover_complete");
- for (u_int16_t i=0; i<_num_jfiles; i++)
- _fc_arr[i]->reset(&_rcvdat);
- _wrfc.initialize(_num_jfiles, _jfsize_sblks, _fc_arr, &_rcvdat);
- _rrfc.initialize(_num_jfiles, _fc_arr);
+ for (u_int16_t i=0; i<_lfmgr.size(); i++)
+ _lfmgr.get_fcntlp(i)->reset(&_rcvdat);
+ _wrfc.initialize(_jfsize_sblks, &_rcvdat);
+ _rrfc.initialize();
_rrfc.set_findex(_rcvdat.ffid());
_rmgr.recover_complete();
_readonly_flag = false;
@@ -430,7 +387,7 @@
u_int16_t fid = _wrfc.index();
while ( _emap.get_enq_cnt(ffid) == 0 && _tmap.get_txn_fid_cnt(ffid) == 0
&& ffid != fid)
{
- if (++ffid >= _num_jfiles)
+ if (++ffid >= _lfmgr.size())
ffid = 0;
}
if (!_rrfc.is_active())
@@ -518,7 +475,7 @@
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__RTCLOCK, oss.str(), "jcntl",
"write_infofile");
}
- jinf ji(_jid, _jdir.dirname(), _base_filename, _num_jfiles, _jfsize_sblks,
+ jinf ji(_jid, _jdir.dirname(), _base_filename, _lfmgr.size(), _jfsize_sblks,
_wmgr.cache_pgsize_sblks(), _wmgr.cache_num_pages(), ts);
ji.write();
}
@@ -593,14 +550,14 @@
// If the number of files does not tie up with the jinf file from the journal being
recovered,
// use the jinf data.
- if (_num_jfiles != ji.num_jfiles())
+ if (rd._num_jfiles != ji.num_jfiles())
{
std::ostringstream oss;
oss << "Recovery found " << ji.num_jfiles() <<
- " files (different from --num-jfiles value of " <<
_num_jfiles << ").";
+ " files (different from --num-jfiles value of " <<
rd._num_jfiles << ").";
this->log(LOG_WARN, oss.str());
- _num_jfiles = ji.num_jfiles();
- _rcvdat._enq_cnt_list.resize(_num_jfiles);
+ rd._num_jfiles = ji.num_jfiles();
+ _rcvdat._enq_cnt_list.resize(rd._num_jfiles);
}
if (_jfsize_sblks != ji.jfsize_sblks())
{
@@ -627,6 +584,7 @@
rd._owi = ji.get_initial_owi();
rd._frot = ji.get_frot();
rd._jempty = false;
+ ji.get_normalized_fid_list(rd._fid_list);
}
catch (const jexception& e)
{
@@ -648,7 +606,8 @@
_tmap.xid_list(xid_list);
for (std::vector<std::string>::iterator itr = xid_list.begin(); itr !=
xid_list.end(); itr++)
{
- std::vector<std::string>::const_iterator pitr =
std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
+ std::vector<std::string>::const_iterator pitr =
+ std::find(prep_txn_list_ptr->begin(),
prep_txn_list_ptr->end(), *itr);
if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
{
txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
@@ -668,7 +627,7 @@
rd._lffull = rd._eo == (1 + _jfsize_sblks) * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
// Check for journal full condition
- u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
+ u_int16_t next_wr_fid = (rd._lfid + 1) % rd._num_jfiles;
rd._jfull = rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid];
}
}
@@ -892,7 +851,7 @@
rd._eo = ifsp->tellg(); // remember file offset before closing
assert(rd._eo != std::numeric_limits<std::size_t>::max()); // Check for
error code -1
ifsp->close();
- if (++fid >= _num_jfiles)
+ if (++fid >= rd._num_jfiles)
{
fid = 0;
lowi = !lowi; // Flip local owi
@@ -937,7 +896,7 @@
{
if (rd._ffid ? h.get_owi() == lowi : h.get_owi() != lowi) // Overwrite indicator
changed
{
- u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
+ u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : rd._num_jfiles - 1;
if (fid == expected_fid)
{
check_journal_alignment(fid, file_pos);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -129,10 +129,8 @@
bool _autostop; ///< Autostop flag - stops journal when overrun
occurs
// Journal control structures
- u_int16_t _num_jfiles; ///< Number of journal files
u_int32_t _jfsize_sblks; ///< Journal file size in sblks
- fcntl** _fc_arr; ///< Array of pointers to data file controllers
-// lfmgr _lfmgr; ///< LID-FID manager tracks inserted journal
files
+ lfmgr _lfmgr; ///< LID-FID manager tracks inserted journal
files
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
rrfc _rrfc; ///< Read journal rotating file controller
@@ -548,13 +546,13 @@
{ return _wrfc.aio_outstanding_dblks(); }
inline u_int32_t get_wr_outstanding_aio_dblks(u_int16_t pi) const
- { return _wrfc.file_controller(pi)->wr_aio_outstanding_dblks(); }
+ { return _lfmgr.get_fcntlp(pi)->wr_aio_outstanding_dblks(); }
inline u_int32_t get_rd_outstanding_aio_dblks() const
{ return _rrfc.aio_outstanding_dblks(); }
inline u_int32_t get_rd_outstanding_aio_dblks(u_int16_t pi) const
- { return _rrfc.file_controller(pi)->rd_aio_outstanding_dblks(); }
+ { return _lfmgr.get_fcntlp(pi)->rd_aio_outstanding_dblks(); }
inline u_int16_t get_rd_fid() const { return _rrfc.index(); }
inline u_int16_t get_wr_fid() const { return _wrfc.index(); }
@@ -618,8 +616,10 @@
*/
inline const std::string& base_filename() const { return _base_filename; }
- inline u_int16_t num_jfiles() const { return _num_jfiles; }
+ inline u_int16_t num_jfiles() const { return _lfmgr.size(); }
+ inline fcntl* get_fcntlp(const u_int16_t fid) const { return
_lfmgr.get_fcntlp(fid); }
+
inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
// Logging
@@ -640,8 +640,6 @@
static fcntl* new_fcntl(jcntl* const jcp, const std::size_t lid, const
std::size_t fid,
const rcvdat* const rdp);
-
-
protected:
/**
* \brief Check status of journal before allowing write operations.
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -238,6 +238,23 @@
return _fidl;
}
+void
+jinf::get_normalized_fid_list(fid_list& fidl)
+{
+ if (!_analyzed_flag)
+ analyze();
+ fidl.clear();
+ u_int16_t s = _fidl.size();
+ u_int16_t iz = 0; // index of 0 value
+ while (_fidl[iz] && iz < s)
+ iz++;
+ assert(_fidl[iz] == 0);
+ for (u_int16_t i = iz; i < iz + s; i++)
+ fidl.push_back(_fidl[i % s]);
+ assert(fidl[0] == 0);
+ assert(fidl.size() == s);
+}
+
bool
jinf::get_initial_owi()
{
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -49,6 +49,7 @@
{
public:
typedef std::vector<u_int16_t> fid_list;
+ typedef fid_list::const_iterator fidl_citr;
private:
u_int8_t _jver;
@@ -103,6 +104,7 @@
u_int16_t get_first_fid();
u_int16_t get_last_fid();
fid_list& get_fid_list();
+ void get_normalized_fid_list(fid_list& fidl);
bool get_initial_owi();
bool get_frot();
Modified: store/trunk/cpp/lib/jrnl/lfmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfmgr.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/lfmgr.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -81,7 +81,7 @@
if (fid < rd._fid_list.size())
_fcntl_arr[lid_list[fid]] = new_obj_fn(jcp, lid_list[fid], fid, &rd);
else
- _fcntl_arr[fid] = new_obj_fn(jcp, fid, fid, 0);
+ _fcntl_arr[fid] = new_obj_fn(jcp, fid, fid, &rd);
}
void
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -115,20 +115,19 @@
oss << " Overwrite indicator (_owi) = " << (_owi ?
"TRUE" : "FALSE") << std::endl;
oss << " First rotation (_frot) = " << (_frot ?
"TRUE" : "FALSE") << std::endl;
oss << " Journal empty (_jempty) = " << (_jempty ?
"TRUE" : "FALSE") << std::endl;
- oss << " First fid (_ffid) = " << _ffid <<
std::endl;
+ oss << " First (earliest) fid (_ffid) = " << _ffid
<< std::endl;
oss << " First record offset in first fid (_fro) = 0x"
<< std::hex << _fro <<
std::dec << " (" << (_fro/JRNL_DBLK_SIZE)
<< " dblks)" << std::endl;
- oss << " Last fid (_lfid) = " << _lfid <<
std::endl;
+ oss << " Last (most recent) fid (_lfid) = " <<
_lfid << std::endl;
oss << " End offset (_eo) = 0x" << std::hex
<< _eo << std::dec << " (" <<
(_eo/JRNL_DBLK_SIZE) << " dblks)" <<
std::endl;
oss << " Highest rid (_h_rid) = 0x" << std::hex
<< _h_rid << std::dec << std::endl;
oss << " Last file full (_lffull) = " << (_lffull
? "TRUE" : "FALSE") << std::endl;
oss << " Journal full (_jfull) = " << (_jfull ?
"TRUE" : "FALSE") << std::endl;
- oss << " Fid list (_fid_list) = [";
+ oss << " Normalized fid list (_fid_list) = [";
for (std::vector<u_int16_t>::const_iterator i = _fid_list.begin();
i < _fid_list.end(); i++)
{
- if (i != _fid_list.begin())
- oss << ", ";
+ if (i != _fid_list.begin()) oss << ", ";
oss << *i;
}
oss << "]" << std::endl;
Modified: store/trunk/cpp/lib/jrnl/rfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/rfc.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -37,32 +37,23 @@
namespace journal
{
-rfc::rfc(): _nfiles(0), _fc_arr(0), _fc_index(0), _curr_fc(0)
+rfc::rfc(const lfmgr* lfmp): _lfmp(lfmp), _fc_index(0), _curr_fc(0)
{}
rfc::~rfc()
{}
void
-rfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
-{
- _nfiles = nfiles;
- _fc_arr = fc_arr;
-}
-
-void
rfc::finalize()
{
unset_findex();
- _nfiles = 0;
- _fc_arr = 0;
}
void
rfc::set_findex(const u_int16_t fc_index)
{
_fc_index = fc_index;
- _curr_fc = _fc_arr[_fc_index];
+ _curr_fc = _lfmp->get_fcntlp(fc_index);
_curr_fc->rd_reset();
}
@@ -73,11 +64,16 @@
_curr_fc = 0;
}
-fcntl*
-rfc::file_controller(const u_int16_t pg_index) const
+std::string
+rfc::status_str() const
{
- assert(pg_index < _nfiles);
- return _fc_arr[pg_index];
+ if (!_lfmp->is_init())
+ return "state: Uninitialized";
+ if (_curr_fc == 0)
+ return "state: Inactive";
+ std::ostringstream oss;
+ oss << "state: Active";
+ return oss.str();
}
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rfc.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/rfc.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -41,7 +41,7 @@
}
}
-#include "jrnl/fcntl.hpp"
+#include "jrnl/lfmgr.hpp"
#include "jrnl/enums.hpp"
namespace mrg
@@ -57,58 +57,55 @@
* The following states exist in this class:
*
* <pre>
- * is_initialized()
is_active()
- * +===+ _nfiles == 0
- * +---------->| | Uninitialized: _fc_arr == 0 F
F
- * | +-->+===+ --+ _curr_fc == 0
+ * is_init()
is_active()
+ * +===+ _lfmp.is_init() == false
+ * +---------->| | Uninitialized: _curr_fc == 0 F
F
+ * | +-->+===+ --+
* | | |
* | | |
* | finalize() initialize()
* | | |
* | | |
- * | +-- +===+<--+ _nfiles > 0
- * finalize() | | Inactive: _fc_arr != 0 T F
- * | +-->+===+ --+ _curr_fc == 0
+ * | +-- +===+<--+ _lfmp.is_init() == true
+ * finalize() | | Inactive: _curr_fc == 0 T
F
+ * | +-->+===+ --+
* | | |
* | | |
* | unset_findex() set_findex()
* | | |
* | | |
- * | +-- +===+<--+ _nfiles > 0
- * +---------- | | Active: _fc_arr != 0 T T
- * +===+ _curr_fc != 0
+ * | +-- +===+<--+ _lfmp.is_init() == true
+ * +---------- | | Active: _curr_fc != 0 T
T
+ * +===+
* </pre>
*
* The Uninitialized state is where the class starts after construction. Once the
number of files is known and
* the array of file controllers allocated, then initialize() is called to set these,
causing the state to move
* to Inactive.
*
- * Once the index of the active file is known, then calling set_findex() will set the
index and internal pointer
+ * The Inactive state has the file controllers allocated and pointing to their
respective journal files, but no
+ * current file controller has been selected. The pointer to the current file
controller _curr_fc is null. Once the
+ * index of the active file is known, then calling set_findex() will set the index and
internal pointer
* to the currently active file controller. This moves the state to Active.
*
- * Note that if the number of files change, then the object will have to be reset and
reinitialized with a new array
- * of fcntl objects of the appropriate size.
+ * Note TODO: Comment on sync issues between change in num files in _lfmp and
_fc_index/_curr_fc.
*/
class rfc
{
protected:
- u_int16_t _nfiles; ///< Number of data files
- fcntl** _fc_arr; ///< Array of pointers to data file controllers
+ const lfmgr* _lfmp; ///< Pointer to jcntl's lfmgr instance containing
lid/fid map and fcntl objects
u_int16_t _fc_index; ///< Index of current file controller
fcntl* _curr_fc; ///< Pointer to current file controller
public:
- rfc();
+ rfc(const lfmgr* lfmp);
virtual ~rfc();
/**
* \brief Initialize the controller, moving from state Uninitialized to Inactive.
The main function of
* initialize() is to set the number of files and the pointer to the fcntl
array.
- * \param nfiles Number of files in the rotating file group.
- * \param fc_arr Pointer to an array of file controller, each of which correspond
to one of
- * the physical journal files.
*/
- virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
+ virtual inline void initialize() {}
/**
* \brief Reset the controller to Uninitialized state, usually called when the
journal is stopped. Once called,
@@ -119,9 +116,14 @@
/**
* /brief Check initialization state: true = Not Uninitialized, ie Initialized or
Active; false = Uninitialized.
*/
- virtual inline bool is_initialized() const { return _nfiles == 0 || _fc_arr == 0;
}
+ virtual inline bool is_init() const { return _lfmp->is_init(); }
/**
+ * /brief Check active state: true = Initialized and _curr_fc not null; false
otherwise.
+ */
+ virtual inline bool is_active() const { return _lfmp->is_init() &&
_curr_fc != 0; }
+
+ /**
* /brief Sets the current file index and active fcntl object. Moves to state
Active.
*/
virtual void set_findex(const u_int16_t fc_index);
@@ -132,11 +134,6 @@
virtual void unset_findex();
/**
- * /brief Check the file handle state: true = Active; false = Not Active, ie
Inactive or Uninitialized.
- */
- virtual inline bool is_active() const { return _curr_fc != 0; }
-
- /**
* \brief Rotate active file controller to next file in rotating file group.
* \exception jerrno::JERR__NINIT if called before calling initialize().
*/
@@ -153,11 +150,6 @@
inline fcntl* file_controller() const { return _curr_fc; }
/**
- * \brief Returns the journal file controller for the given page index within the
rotating file group.
- */
- fcntl* file_controller(const u_int16_t pg_index) const;
-
- /**
* \brief Returns the currently active file id (fid)
*/
inline u_int16_t fid() const { return _curr_fc->fid(); }
@@ -167,11 +159,13 @@
inline u_int32_t enqcnt() const { return _curr_fc->enqcnt(); }
inline u_int32_t incr_enqcnt() { return _curr_fc->incr_enqcnt(); }
- inline u_int32_t incr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->incr_enqcnt(); }
- inline u_int32_t add_enqcnt(u_int32_t a) { return _curr_fc->add_enqcnt(a); }
- inline u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a) { return
_fc_arr[fid]->add_enqcnt(a); }
- inline u_int32_t decr_enqcnt(u_int16_t fid) { return
_fc_arr[fid]->decr_enqcnt(); }
- inline u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s) { return
_fc_arr[fid]->subtr_enqcnt(s); }
+ inline u_int32_t incr_enqcnt(const u_int16_t fid) { return
_lfmp->get_fcntlp(fid)->incr_enqcnt(); }
+ inline u_int32_t add_enqcnt(const u_int32_t a) { return
_curr_fc->add_enqcnt(a); }
+ inline u_int32_t add_enqcnt(const u_int16_t fid, const u_int32_t a)
+ { return _lfmp->get_fcntlp(fid)->add_enqcnt(a); }
+ inline u_int32_t decr_enqcnt(const u_int16_t fid) { return
_lfmp->get_fcntlp(fid)->decr_enqcnt(); }
+ inline u_int32_t subtr_enqcnt(const u_int16_t fid, const u_int32_t s)
+ { return _lfmp->get_fcntlp(fid)->subtr_enqcnt(s); }
virtual inline u_int32_t subm_cnt_dblks() const = 0;
virtual inline std::size_t subm_offs() const = 0;
@@ -190,7 +184,7 @@
virtual inline bool file_rotate() const = 0;
// Debug aid
- virtual std::string status_str() const = 0;
+ virtual std::string status_str() const;
}; // class rfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/rrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/rrfc.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -41,7 +41,7 @@
namespace journal
{
-rrfc::rrfc(): rfc(), _fh(-1), _valid(false)
+rrfc::rrfc(const lfmgr* lfmp): rfc(lfmp), _fh(-1), _valid(false)
{}
rrfc::~rrfc()
@@ -50,13 +50,6 @@
}
void
-rrfc::initialize(const u_int16_t nfiles, fcntl** fc_arr)
-{
- rfc::initialize(nfiles, fc_arr);
- _valid = false;
-}
-
-void
rrfc::finalize()
{
unset_findex();
@@ -81,24 +74,25 @@
iores
rrfc::rotate()
{
- if (!_nfiles)
+ if (!_lfmp->size())
throw jexception(jerrno::JERR__NINIT, "rrfc", "rotate");
u_int16_t next_fc_index = _fc_index + 1;
- if (next_fc_index == _nfiles)
+ if (next_fc_index == _lfmp->size())
next_fc_index = 0;
- fcntl* next_fc = _fc_arr[next_fc_index];
+ fcntl* next_fc = _lfmp->get_fcntlp(next_fc_index);
_fc_index = next_fc_index;
_curr_fc = next_fc;
open_fh(_curr_fc->fname());
return RHM_IORES_SUCCESS;
}
-// TODO: update this to reflect all status data
std::string
rrfc::status_str() const
{
std::ostringstream oss;
- oss << "rrfc[" << _fc_index << "]: " <<
_curr_fc->status_str();
+ oss << "rrfc: " << rfc::status_str();
+ if (is_active())
+ oss << " fcntl[" << _fc_index << "]: "
<< _curr_fc->status_str();
return oss.str();
}
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -59,27 +59,26 @@
* active file for reading. Similarly, unset_findex() closes this file handle.
*
* <pre>
- * is_initialized()
is_active()
- * +===+ _nfiles == 0
- * +---------->| | Uninitialized: _fc_arr == 0 F
F
- * | +-->+===+ --+ _curr_fc == 0
- * | | | _fh == -1
+ * is_init()
is_active()
+ * +===+ _lfmp.is_init() == false
+ * +---------->| | Uninitialized: _curr_fc == 0 F
F
+ * | +-->+===+ --+ _fh == -1
* | | |
+ * | | |
* | finalize() initialize()
* | | |
* | | |
- * | +-- +===+<--+ _nfiles > 0
- * finalize() | | Inactive: _fc_arr != 0 T F
- * | +-->+===+ --+ _curr_fc == 0
- * | | | _fh == -1
+ * | +-- +===+<--+ _lfmp.is_init() == true
+ * finalize() | | Inactive: _curr_fc == 0 T
F
+ * | +-->+===+ --+ _fh == -1
* | | |
+ * | | |
* | unset_findex() set_findex()
* | | |
* | | |
- * | +-- +===+<--+ _nfiles > 0
- * +---------- | | Active: _fc_arr != 0 T T
- * +===+ _curr_fc != 0
- * _fh >= 0
+ * | +-- +===+<--+ _lfmp.is_init() == true
+ * +---------- | | Active: _curr_fc != 0 T
T
+ * +===+ _fh >= 0
* </pre>
*
* In adition to the states above, class rrfc contains a validity flag. This is
operated indepenedently of the state
@@ -94,17 +93,14 @@
bool _valid; ///< Flag is true when read pages contain vailid data
public:
- rrfc();
+ rrfc(const lfmgr* lfmp);
virtual ~rrfc();
/**
* \brief Initialize the controller, moving from state Uninitialized to
Initialized. The main function of
* initialize() is to set the number of files and the pointer to the fcntl
array.
- * \param nfiles Number of files in the rotating file group.
- * \param fc_arr Pointer to an array of file controller, each of which correspond
to one of
- * the physical journal files.
*/
- virtual void initialize(const u_int16_t nfiles, fcntl** fc_arr);
+ inline void initialize() { rfc::initialize(); _valid = false; }
/**
* \brief Reset the controller to Uninitialized state, usually called when the
journal is stopped. Once called,
@@ -115,17 +111,17 @@
/**
* /brief Opens the file handle for reading a particular fid. Moves to state
open.
*/
- virtual void set_findex(const u_int16_t fc_index);
+ void set_findex(const u_int16_t fc_index);
/**
* /brief Closes the read file handle and nulls the active fcntl pointer. Moves to
state closed.
*/
- virtual void unset_findex();
+ void unset_findex();
/**
* /brief Check the state: true = open; false = closed.
*/
- virtual inline bool is_active() const { return _curr_fc != 0 && _fh >=
0; }
+ inline bool is_active() const { return _curr_fc != 0 && _fh >= 0; }
/**
* /brief Sets the validity flag which indicates that the read buffers contain
valid data for reading.
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -874,7 +874,7 @@
// get fid from original file header record, update pointers for that fid
file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
u_int32_t fid = fhp->_fid;
- fcntl* fcntlp = _wrfc.file_controller(fid);
+ fcntl* fcntlp = _jc->get_fcntlp(fid);
fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
}
}
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -39,17 +39,12 @@
namespace journal
{
-wrfc::wrfc():
- rfc(),
+wrfc::wrfc(const lfmgr* lfmp):
+ rfc(lfmp),
_fsize_sblks(0),
_fsize_dblks(0),
_enq_cap_offs_dblks(0),
-#ifdef DRHM_TESTVALS
- // TODO: Find method of specifying 64-bit literals under gcc with -pedantic
option
- _rid(u_int64_t(0xffeeddcc) << 32), // For testing high rids
-#else
_rid(0),
-#endif
_reset_ok(false),
_owi(false),
_frot(true)
@@ -59,14 +54,12 @@
{}
void
-wrfc::initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, fcntl** fc_arr,
rcvdat* rdp)
+wrfc::initialize(const u_int32_t fsize_sblks, rcvdat* rdp)
{
if (rdp)
{
- _nfiles = nfiles;
- _fc_arr = fc_arr;
_fc_index = rdp->_lfid;
- _curr_fc = _fc_arr[rdp->_lfid];
+ _curr_fc = _lfmp->get_fcntlp(_fc_index);
_curr_fc->wr_reset(rdp);
_rid = rdp->_h_rid + 1;
_reset_ok = true;
@@ -77,19 +70,14 @@
}
else
{
- rfc::initialize(nfiles, fc_arr);
+ rfc::initialize();
rfc::set_findex(0);
-#ifdef DRHM_TESTVALS
- _rid = u_int64_t(0xffeeddcc) << 32;
-#else
_rid = 0ULL;
-#endif
_reset_ok = false;
}
_fsize_sblks = fsize_sblks;
_fsize_dblks = fsize_sblks * JRNL_SBLK_SIZE;
- _enq_cap_offs_dblks = (u_int32_t)std::ceil(_fsize_dblks * _nfiles *
- (100.0 - JRNL_ENQ_THRESHOLD) / 100);
+ _enq_cap_offs_dblks = (u_int32_t)std::ceil(_fsize_dblks * _lfmp->size() * (100.0 -
JRNL_ENQ_THRESHOLD) / 100);
// Check the offset is at least one file; if not, make it so
if (_enq_cap_offs_dblks < _fsize_dblks)
_enq_cap_offs_dblks = _fsize_dblks;
@@ -97,16 +85,16 @@
iores wrfc::rotate()
{
- if (!_nfiles)
+ if (!_lfmp->size())
throw jexception(jerrno::JERR__NINIT, "wrfc", "rotate");
_fc_index++;
- if (_fc_index == _nfiles)
+ if (_fc_index == _lfmp->size())
{
_fc_index = 0;
_owi = !_owi;
_frot = false;
}
- _curr_fc = _fc_arr[_fc_index];
+ _curr_fc = _lfmp->get_fcntlp(_fc_index);
if (_curr_fc->aio_cnt())
return RHM_IORES_FILE_AIOWAIT;
if (!wr_reset()) //Checks if file is still in use (ie not fully dequeued yet)
@@ -119,7 +107,7 @@
if (_frot)
return 0;
u_int16_t next_index = _fc_index + 1;
- if (next_index >= _nfiles)
+ if (next_index >= _lfmp->size())
next_index = 0;
return next_index;
}
@@ -141,9 +129,9 @@
fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks;
if (fwd_dblks)
{
- if (++findex == _nfiles)
+ if (++findex == _lfmp->size())
findex = 0;
- fcp = _fc_arr[findex];
+ fcp = _lfmp->get_fcntlp(findex);
}
in_use |= fcp->enqcnt() > 0;
}
@@ -162,7 +150,9 @@
wrfc::status_str() const
{
std::ostringstream oss;
- oss << "wrfc[" << _fc_index << "]: " <<
_curr_fc->status_str();
+ oss << "wrfc: " << rfc::status_str();
+ if (is_active())
+ oss << " fcntl[" << _fc_index << "]: "
<< _curr_fc->status_str();
return oss.str();
}
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -64,20 +64,15 @@
bool _frot; ///< Flag is true for first rotation, false
otherwise
public:
- wrfc();
+ wrfc(const lfmgr* lfmp);
virtual ~wrfc();
/**
* \brief Initialize the controller.
- * \param nfiles Number of files in the rotating file group.
* \param fsize_sblks Size of each journal file in sblks.
- * \param fc_arr Pointer to an array of file controllers, each of which correspond
to one of
- * the physical journal files.
- * \param rdp Struct carrying restore information. Optional for non-restore use,
defaults to
- * 0 (NULL).
+ * \param rdp Struct carrying restore information. Optional for non-restore use,
defaults to 0 (NULL).
*/
- void initialize(const u_int16_t nfiles, const u_int32_t fsize_sblks, fcntl**
fc_arr,
- rcvdat* rdp = 0);
+ void initialize(const u_int32_t fsize_sblks, rcvdat* rdp = 0);
/**
* \brief Rotate active file controller to next file in rotating file group.
Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-10-14 09:39:26 UTC (rev 2631)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp 2008-10-14 12:04:52 UTC (rev 2632)
@@ -379,8 +379,8 @@
unsigned m;
// Fill journal to just below threshold
- u_int32_t t = num_msgs_to_threshold(NUM_DEFAULT_JFILES,
- DEFAULT_JFSIZE_SBLKS * JRNL_SBLK_SIZE, MSG_REC_SIZE_DBLKS);
+ u_int32_t t = num_msgs_to_threshold(NUM_DEFAULT_JFILES, DEFAULT_JFSIZE_SBLKS *
JRNL_SBLK_SIZE,
+ MSG_REC_SIZE_DBLKS);
u_int32_t d = num_dequeues_rem(NUM_DEFAULT_JFILES, DEFAULT_JFSIZE_SBLKS *
JRNL_SBLK_SIZE);
for (m=0; m<t; m++)
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);