Author: kpvdr
Date: 2007-09-20 16:35:17 -0400 (Thu, 20 Sep 2007)
New Revision: 930
Added:
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jdir.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/lfh.cpp
store/trunk/cpp/lib/jrnl/lfh.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/tests.ods
Log:
Journal recover implemented.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/Makefile.am 2007-09-20 20:35:17 UTC (rev 930)
@@ -1,4 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS)
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) -pthread
lib_LTLIBRARIES = libbdbstore.la
@@ -66,6 +66,7 @@
jrnl/lfh.hpp \
jrnl/nlfh.hpp \
jrnl/pmgr.hpp \
+ jrnl/rcvdat.hpp \
jrnl/rmgr.hpp \
jrnl/rrfc.hpp \
jrnl/wmgr.hpp \
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -44,17 +44,22 @@
enq_map::enq_map():
_map()
{
- _map.clear();
+ pthread_mutex_init(&_mutex, NULL);
}
enq_map::~enq_map()
-{}
+{
+ pthread_mutex_destroy(&_mutex);
+}
void
enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception)
{
- _ret = _map.insert(std::pair<u_int64_t, u_int16_t>(rid, fid));
- if (_ret.second == false)
+ pthread_mutex_lock(&_mutex);
+ std::pair<std::map<u_int64_t, u_int16_t>::iterator, bool> ret =
+ _map.insert(std::pair<u_int64_t, u_int16_t>(rid, fid));
+ pthread_mutex_unlock(&_mutex);
+ if (ret.second == false)
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
@@ -66,32 +71,57 @@
const u_int16_t
enq_map::get_fid(const u_int64_t rid) throw (jexception)
{
- _it = _map.find(rid);
- if (_it == _map.end())
+ pthread_mutex_lock(&_mutex);
+ std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
+ pthread_mutex_unlock(&_mutex);
+ if (itr == _map.end())
{
std::stringstream ss;
ss << std::hex << std::setfill('0');
ss << "rid=0x" << std::setw(16) << rid;
- throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
+ throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_fid");
}
- return _it->second;
+ return itr->second;
}
const u_int16_t
enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
{
- _it = _map.find(rid);
- if (_it == _map.end())
+ pthread_mutex_lock(&_mutex);
+ std::map<u_int64_t, u_int16_t>::iterator itr = _map.find(rid);
+ if (itr == _map.end())
{
+ pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << std::setfill('0');
ss << "rid=0x" << std::setw(16) << rid;
throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map",
"get_remove_fid");
}
- u_int16_t fid = _it->second;
- _map.erase(_it);
+ u_int16_t fid = itr->second;
+ _map.erase(itr);
+ pthread_mutex_unlock(&_mutex);
return fid;
}
+void
+enq_map::rid_list(std::vector<u_int64_t>& rv)
+{
+ rv.clear();
+ pthread_mutex_lock(&_mutex);
+ for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr !=
_map.end(); itr++)
+ rv.push_back(itr->first);
+ pthread_mutex_unlock(&_mutex);
+}
+
+void
+enq_map::fid_list(std::vector<u_int16_t>& fv)
+{
+ fv.clear();
+ pthread_mutex_lock(&_mutex);
+ for (std::map<u_int64_t, u_int16_t>::iterator itr = _map.begin(); itr !=
_map.end(); itr++)
+ fv.push_back(itr->second);
+ pthread_mutex_unlock(&_mutex);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -42,6 +42,8 @@
}
#include <map>
+#include <vector>
+#include <pthread.h>
#include <jrnl/jexception.hpp>
namespace rhm
@@ -58,8 +60,7 @@
{
private:
std::map<u_int64_t, u_int16_t> _map;
- std::map<u_int64_t, u_int16_t>::iterator _it;
- std::pair<std::map<u_int64_t, u_int16_t>::iterator, bool> _ret;
+ pthread_mutex_t _mutex;
public:
enq_map();
@@ -71,6 +72,8 @@
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
+ void rid_list(std::vector<u_int64_t>& rv);
+ void fid_list(std::vector<u_int16_t>& fv);
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -54,6 +54,7 @@
_base_filename(base_filename),
_init_flag(false),
_stop_flag(false),
+ _readonly_flag(false),
_autostop(true),
_datafh(NULL),
_emap(),
@@ -82,6 +83,7 @@
{
// Prepare journal dir, journal files and file handles
_jdir.clear_dir();
+ _emap.clear();
// TODO - place this in a finalize() fn? - see ~jcntl()...
if (_datafh)
@@ -101,7 +103,7 @@
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename;
- _datafh[i] = ::new lfh(ss.str(), i);
+ _datafh[i] = ::new lfh(ss.str(), i, NULL);
}
// TODO: Check the following comment/note (may be obsolete):
@@ -119,6 +121,69 @@
_init_flag = true;
}
+void
+jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl,
+ const aio_cb wr_cb) throw (jexception)
+{
+ // Verify journal dir and journal files
+ _jdir.verify_dir();
+ _rcvdat.reset();
+ _emap.clear();
+ rcvr_janalyze(_rcvdat);
+//std::cout << "em=" << _emap.size() << "; rd="
<< rd._ffid << ":0x" << std::hex << rd._fro <<
std::dec << "-" << rd._lfid << ":0x" <<
std::hex << rd._eo << std::dec << " ";
+//std::vector<u_int64_t> kv;
+//std::vector<u_int64_t>::iterator ki;
+//_emap.rid_list(kv);
+//std::cout << " rids=[";
+//for (ki=kv.begin(); ki<kv.end(); ki++) {if (ki!=kv.begin()) std::cout <<
", "; std::cout << *ki;}
+//std::cout << "]" << std::flush;
+
+ if (_datafh)
+ {
+ for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ if (_datafh[i])
+ ::delete _datafh[i];
+ ::delete[] _datafh;
+ }
+
+ _datafh = ::new lfh*[JRNL_NUM_FILES];
+ // NOTE: We NULL the pointer array prior to setting the pointers because exceptions
+ // can be thrown during pointer initialization, and the clean() fn that will be
+ // called after an exception will attempt to free any non-null pointer.
+ ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
+ for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ {
+ std::stringstream ss;
+ ss << _jdir << "/" << _base_filename;
+ _datafh[i] = ::new lfh(ss.str(), i, &_rcvdat);
+ }
+
+ // TODO: Check the following comment/note (may be obsolete):
+ // NOTE: The write RFC must initialize first. This sets all the file handle object
+ // (lfh) counters and pointers for both read and write, since write activity
+ // constrains read activity (i.e. one can't read what has not yet been written).
+// _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._lfid, _rcvdat._h_rid +
1);
+ _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _rmgr.initialize(rd_dtokl, rd_cb);
+ _wmgr.initialize(wr_dtokl, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
+
+ _readonly_flag = true;
+ _init_flag = true;
+}
+
+void
+jcntl::recovered() throw (jexception)
+{
+ if (!_readonly_flag)
+ throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recovered");
+ for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ _datafh[i]->reset(&_rcvdat);
+ _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _readonly_flag = false;
+}
+
void
jcntl::delete_jrnl() throw (jexception)
{
@@ -131,21 +196,21 @@
jcntl::enqueue_data(const void* const dbuf, const size_t dlen, data_tok* const dtok)
throw (jexception)
{
- check_status("enqueue_data");
+ check_wstatus("enqueue_data");
return _wmgr.enqueue(dbuf, dlen, dtok);
}
const iores
jcntl::read_data(void* const dbuf, const size_t dbsize, data_tok* const dtok) throw
(jexception)
{
- check_status("read_data");
+ check_rstatus("read_data");
return _rmgr.read(dbuf, dbsize, dtok);
}
const iores
jcntl::dequeue_data(data_tok* const dtok) throw (jexception)
{
- check_status("dequeue_data");
+ check_wstatus("dequeue_data");
return _wmgr.dequeue(dtok);
}
@@ -164,25 +229,42 @@
void
jcntl::stop(bool block_for_aio_cmpl) throw (jexception)
{
- check_status("stop");
+ if (_readonly_flag)
+ check_rstatus("stop");
+ else
+ check_wstatus("stop");
_stop_flag = true;
- flush();
- if (block_for_aio_cmpl)
- aio_cmpl_wait();
+ if (!_readonly_flag)
+ {
+ flush();
+ if (block_for_aio_cmpl)
+ aio_cmpl_wait();
+ }
}
// Private functions
void
-jcntl::check_status(const char* fn_name) const throw (jexception)
+jcntl::check_wstatus(const char* fn_name) const throw (jexception)
{
if (!_init_flag)
throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+ if (_readonly_flag)
+ throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name);
if (_stop_flag)
throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
}
void
+jcntl::check_rstatus(const char* fn_name) const throw (jexception)
+{
+ if (!_init_flag)
+ throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
+ if (_stop_flag)
+ throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
+}
+
+void
jcntl::write_infofile() const throw (jexception)
{
timespec ts;
@@ -199,7 +281,7 @@
if (!of.good())
throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl",
"write_infofile");
std::string s;
- of << ji.str(s);
+ of << ji.xml_str(s);
of.close();
}
@@ -220,6 +302,136 @@
}
void
+jcntl::rcvr_janalyze(rcvdat& rd) throw (jexception)
+{
+ jinf ji(_jdir.dirname() + "/" + _base_filename + "." +
JRNL_INFO_EXTENSION, true);
+ try
+ {
+ rd._ffid = ji.get_start_file();
+ rd._empty = false;
+ }
+ catch (jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY)
+ throw e;
+ }
+//std::cout << "f" << rd._ffid <<
(rd._empty?"e":"") << " ";
+
+ // Restore all read and write pointers
+ if (!rd._empty)
+ {
+ bool eoj = false;
+ for (u_int16_t fnum=0; fnum<JRNL_NUM_FILES && !eoj; fnum++)
+ {
+ u_int16_t fid = (fnum + rd._ffid) % JRNL_NUM_FILES;
+ eoj = rcvr_fanalyze(fid, rd);
+ }
+ }
+}
+
+const bool
+jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd) throw (jexception)
+{
+ bool eoj = false;
+ std::stringstream ss;
+ ss << _jdir.dirname() << "/" << _base_filename <<
".";
+ ss << std::setfill('0') << std::setw(4) << fid <<
"." << JRNL_DATA_EXTENSION;
+//std::cout << "rcvr_fanalyze: " << ss.str() <<
":";
+ std::ifstream jifs(ss.str().c_str());
+ if (!jifs.good())
+ throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf",
"analyze");
+
+ // 1. Read file header
+ file_hdr fhdr;
+ jifs.read((char*)&fhdr, sizeof(fhdr));
+ if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+ {
+ assert(fhdr._fid == fid);
+ if (!rd._fro)
+ rd._fro = fhdr._fro;
+ std::streamoff foffs = fhdr._fro;
+ jifs.seekg(foffs);
+
+ // 2. Read file records
+ while (jifs.good() && !eoj)
+ {
+ hdr h;
+ jifs.read((char*)&h, sizeof(hdr));
+ switch(h._magic)
+ {
+ case RHM_JDAT_ENQ_MAGIC:
+ {
+ size_t recsize = 0;
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+ jifs.ignore(sizeof(u_int32_t));
+#endif
+ jifs.read((char*)&recsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+ jifs.ignore(sizeof(u_int32_t));
+#endif
+ _emap.insert_fid(h._rid, fid);
+ rd._enq_cnt_list[fid]++;
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
+//std::cout << " e" << h._rid;
+ u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize +
sizeof(enq_hdr) +
+ sizeof(enq_tail));
+ foffs += rec_dblks * JRNL_DBLK_SIZE;
+ jifs.seekg(foffs);
+ }
+ break;
+ case RHM_JDAT_DEQ_MAGIC:
+ {
+ u_int64_t drid = 0;
+ jifs.read((char*)&drid, sizeof(u_int64_t));
+ try
+ {
+ _emap.get_remove_fid(drid);
+ rd._enq_cnt_list[fid]--;
+ }
+ catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown
here
+ if (rd._h_rid < h._rid)
+ rd._h_rid = h._rid;
+//std::cout << " d" << drid << ")";
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
+ foffs += rec_dblks * JRNL_DBLK_SIZE;
+ jifs.seekg(foffs);
+ }
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+ {
+//std::cout << " x";
+ u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+ foffs += rec_dblks * JRNL_DBLK_SIZE;
+ jifs.seekg(foffs);
+ }
+ break;
+ case 0:
+ rd._lfid = fid;
+ rd._eo = foffs;
+ if (!jifs.eof())
+ eoj = true;
+//std::cout << (jifs.eof()?" <eof>":" <end>");
+ break;
+ default:
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0') <<
"Magic=0x" << std::setw(8) << h._magic;
+ throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(),
"jcntl",
+ "rcvr_fanalyze");
+ }
+ }
+ }
+ else
+ {
+ eoj = true;
+//std::cout << " <empty>";
+ }
+ jifs.close();
+//std::cout << std::endl;
+ return eoj;
+}
+
+void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
@@ -232,7 +444,7 @@
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped())
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
{
data_tok::write_state st = dtokp->wstate();
if (st == data_tok::ENQ)
@@ -270,7 +482,7 @@
for (u_int32_t i=0; i<num_dtoks; i++)
{
data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped())
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
{
if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() ==
data_tok::READ)
{
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -44,6 +44,7 @@
#include <deque>
#include <jrnl/jdir.hpp>
#include <jrnl/lfh.hpp>
+#include <jrnl/rcvdat.hpp>
#include <jrnl/rmgr.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
@@ -118,6 +119,8 @@
*/
bool _stop_flag;
+ bool _readonly_flag;
+
bool _autostop; ///< Autostop flag - stops journal when overrun
occurs
// Journal control structures
@@ -127,6 +130,7 @@
wrfc _wrfc; ///< Write journal rotating file controller
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
+ rcvdat _rcvdat; ///< Recovery data used for recovery
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///<
Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
@@ -157,7 +161,9 @@
* <b>NOTE: Any existing journal will be ignored by this
operation.</b> To use recover
* the data from an existing journal, use recover().
*
- * <b>NOTE: if NULL is passed to the deque's they are created internally
and deleted intenally </b>
+ * <b>NOTE: if NULL is passed to the deque's they are created internally
and deleted
+ * intenally </b>
+ *
* <b>NOTE: if NULL is passed to the callbacks internal ones will be used.
</b>
*
* \param rd_dtokl deque for storing data tokens retruning from read AIO
operations.
@@ -173,23 +179,24 @@
* Initialize using internal default callbacks and data_tok lists.
* TODO: Move to JournalImpl later
*/
- void initialize()
+ void initialize() throw (jexception)
{
initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
&aio_wr_callback );
}
- /// replace with real code to recover
-// void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
-// std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
-
-
- void recover() {
- initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list, &aio_wr_callback );
- }
- void recovered() {}
+ void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
+ std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ void recover() throw (jexception)
+ {
+ recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
+ &aio_wr_callback );
+ }
+
+ void recovered() throw (jexception);
+
/**
* \brief Delete the journal directory of files matching the base filename
* by moving them into a subdirectory
@@ -223,6 +230,9 @@
*/
const iores read_data(void* const dbuf, const size_t dbsize, data_tok* const
dtok)
throw (jexception);
+
+// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
+// throw (jexception);
/**
* \brief Dequeues (marks as no longer needed) data from journal.
@@ -268,22 +278,23 @@
* AIO operations to complete.
*/
void stop(bool block_for_aio_cmpl = false) throw (jexception);
-
+
/**
+ * \brief Force a flush of the write page cache, creating a single AIO write
operation.
+ */
+ inline void flush() throw (jexception) { _wmgr.flush(); }
+
+ inline const u_int32_t get_enq_cnt() const { return _emap.size(); }
+
+ /**
* \brief Check if the journal is stopped.
*
- *
* \return <b><i>true</i></b> if the jouranl is stopped;
* <b><i>false</i></b> otherwise.
*/
- bool is_stopped() {return _stop_flag;}
+ bool is_stopped() { return _stop_flag; }
/**
- * \brief Force a flush of the write page cache, creating a single AIO write
operation.
- */
- inline void flush() throw (jexception) { _wmgr.flush(); }
-
- /**
* \brief Check if the journal is ready to read and write data.
*
* Checks if the jouranl is ready to read and write data. This function will
return
@@ -298,6 +309,8 @@
*/
inline const bool is_ready() const { return _init_flag and not _stop_flag; }
+ inline const bool is_read_only() const { return _readonly_flag; }
+
/**
* \brief Get the journal directory.
*
@@ -321,11 +334,16 @@
private:
/**
- * \brief Check status of journal before allowing certain operations.
+ * \brief Check status of journal before allowing write operations.
*/
- void check_status(const char* fn_name) const throw (jexception);
+ void check_wstatus(const char* fn_name) const throw (jexception);
/**
+ * \brief Check status of journal before allowing read operations.
+ */
+ void check_rstatus(const char* fn_name) const throw (jexception);
+
+ /**
* \brief Write info file <basefilename>.jinf to disk
*/
void write_infofile() const throw (jexception);
@@ -335,6 +353,18 @@
*/
void aio_cmpl_wait() throw (jexception);
+ /**
+ * \brief Analyze journal for recovery.
+ */
+ void rcvr_janalyze(rcvdat& jrs) throw (jexception);
+
+ /**
+ * \brief Analyze a particular journal file for recovery.
+ *
+ * \return <b><i>true</i></b> if end of journal (eoj)
found; <b><i>false</i></b> otherwise.
+ */
+ const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs) throw (jexception);
+
/**
* Intenal callback write
*/
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -35,9 +35,11 @@
#include <dirent.h>
#include <cerrno>
+//#include <fstream>
#include <iomanip>
#include <sstream>
#include <sys/stat.h>
+#include <jrnl/jcfg.hpp>
#include <jrnl/jerrno.hpp>
#include <iostream>// debug only
namespace rhm
@@ -48,8 +50,7 @@
jdir::jdir(const std::string& dirname, const std::string& _base_filename):
_dirname(dirname),
_base_filename(_base_filename)
-{
-}
+{}
// === create_dir ===
@@ -157,6 +158,8 @@
}
}
}
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
// if (errno)
// {
// std::stringstream ss;
@@ -174,6 +177,45 @@
}
+// === verify_dir ===
+
+void
+jdir::verify_dir() throw (jexception)
+{
+ verify_dir(_dirname, _base_filename);
+}
+
+void
+jdir::verify_dir(const char* dirname, const char* base_filename) throw (jexception)
+{
+ verify_dir(std::string(dirname), std::string(base_filename));
+}
+
+
+void
+jdir::verify_dir(const std::string& dirname, const std::string& base_filename)
throw (jexception)
+{
+ if (!is_dir(dirname))
+ {
+ std::stringstream ss;
+ ss << "dir=\"" << dirname <<
"\"";
+ throw jexception(jerrno::JERR_JDIR_NOTDIR, ss.str(), "jdir",
"verify_dir");
+ }
+
+ // Read jinf file, then verify all journal files are present
+ jinf ji(dirname + "/" + base_filename + "." +
JRNL_INFO_EXTENSION, true);
+ for (u_int16_t fnum=0; fnum < ji.num_files(); fnum++)
+ {
+ std::stringstream ss;
+ ss << dirname << "/" << base_filename <<
".";
+ ss << std::setw(4) << std::setfill('0') << std::hex
<< fnum;
+ ss << "." << JRNL_DATA_EXTENSION;
+ if (!exists(ss.str()))
+ throw jexception(jerrno::JERR_JDIR_NOSUCHFILE, ss.str(), "jdir",
"verify_dir");
+ }
+}
+
+
// === delete_dir ===
void
@@ -241,6 +283,8 @@
}
}
}
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
// if (errno)
// {
// std::stringstream ss;
@@ -298,6 +342,8 @@
}
}
}
+// FIXME: Find out why this fails with false alarms/errors from time to time...
+// While commented out, there is no error capture from reading dir entries.
// if (errno)
// {
// std::stringstream ss;
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -43,6 +43,7 @@
}
#include <jrnl/jexception.hpp>
+#include <jrnl/jinf.hpp>
namespace rhm
{
@@ -60,6 +61,7 @@
std::string _base_filename;
public:
+
/**
* \brief Sole constructor
*
@@ -148,14 +150,49 @@
static void clear_dir(const std::string& dirname, const std::string&
base_filename,
const bool create_flag = true) throw (jexception);
+
/**
+ * \brief Clear the journal directory of files matching the base filename
+ * by moving them into a subdirectory. This fn uses the dirname and
base_filename
+ * that were set on construction.
+ *
+ * \exception ??
+ */
+ void verify_dir() throw (jexception);
+
+ /**
+ * \brief Clear the directory dirname of journal files matching base_filename
+ * by moving them into a subdirectory.
+ *
+ * \param dirname C-string containing name of journal directory.
+ * \param base_filename C-string containing base filename of journal files to be
matched
+ * for moving into subdirectory.
+ *
+ * \exception ??
+ */
+ static void verify_dir(const char* dirname, const char* base_filename) throw
(jexception);
+
+ /**
+ * \brief Clear the directory dirname of journal files matching base_filename
+ * by moving them into a subdirectory.
+ *
+ * \param dirname String containing name of journal directory.
+ * \param base_filename String containing base filename of journal files to be
matched
+ * for moving into subdirectory.
+ *
+ * \exception ??
+ */
+ static void verify_dir(const std::string& dirname, const std::string&
base_filename)
+ throw (jexception);
+
+ /**
* \brief Delete the journal directory and all files and sub--directories that it
may
* contain. This is equivilent of rm -rf.
*
* FIXME: links are not handled correctly.
- *
- * \param children_only To only delete children.
*
+ * \param children_only To only delete children.
+ *
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
* directory failed.
@@ -170,7 +207,7 @@
* FIXME: links are not handled correctly.
*
* \param dirname C-string containing name of directory to be deleted.
- * \param children_only To only delete children.
+ * \param children_only To only delete children.
*
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
@@ -186,14 +223,15 @@
* FIXME: links are not handled correctly.
*
* \param dirname String containing name of directory to be deleted.
- * \param children_only To only delete children.
+ * \param children_only To only delete children.
*
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
* directory failed.
* \exception The directory handle could not be closed.
*/
- static void delete_dir(const std::string& dirname, bool children_only =
false) throw (jexception);
+ static void delete_dir(const std::string& dirname, bool children_only =
false)
+ throw (jexception);
/**
* \brief Create bakup directory that is next in sequence and move all journal
files
@@ -273,6 +311,8 @@
*/
const static bool exists(const std::string& name) throw (jexception);
+ public:
+
/**
* \brief Stream operator
*/
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -53,18 +53,23 @@
// class jcntl
const u_int32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
-const u_int32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0201;
+const u_int32_t jerrno::JERR_JCNTL_READONLY = 0x0201;
+const u_int32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0202;
+const u_int32_t jerrno::JERR_JCNTL_UNKNOWNMAGIC = 0x0203;
+const u_int32_t jerrno::JERR_JCNTL_NOTRECOVERED = 0x0204; ///< Req' recover() to
be called first
// class jdir
-const u_int32_t jerrno::JERR_JDIR_MKDIR = 0x0300;
-const u_int32_t jerrno::JERR_JDIR_OPENDIR = 0x0301;
-const u_int32_t jerrno::JERR_JDIR_READDIR = 0x0302;
-const u_int32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0303;
-const u_int32_t jerrno::JERR_JDIR_RMDIR = 0x0304;
-const u_int32_t jerrno::JERR_JDIR_FMOVE = 0x0305;
-const u_int32_t jerrno::JERR_JDIR_STAT = 0x0306;
-const u_int32_t jerrno::JERR_JDIR_UNLINK = 0x0307;
-const u_int32_t jerrno::JERR_JDIR_BADFTYPE = 0x0308;
+const u_int32_t jerrno::JERR_JDIR_NOTDIR = 0x0300;
+const u_int32_t jerrno::JERR_JDIR_MKDIR = 0x0301;
+const u_int32_t jerrno::JERR_JDIR_OPENDIR = 0x0302;
+const u_int32_t jerrno::JERR_JDIR_READDIR = 0x0303;
+const u_int32_t jerrno::JERR_JDIR_CLOSEDIR = 0x0304;
+const u_int32_t jerrno::JERR_JDIR_RMDIR = 0x0305;
+const u_int32_t jerrno::JERR_JDIR_NOSUCHFILE = 0x0306;
+const u_int32_t jerrno::JERR_JDIR_FMOVE = 0x0307;
+const u_int32_t jerrno::JERR_JDIR_STAT = 0x0308;
+const u_int32_t jerrno::JERR_JDIR_UNLINK = 0x0309;
+const u_int32_t jerrno::JERR_JDIR_BADFTYPE = 0x030a;
// class nlfh
const u_int32_t jerrno::JERR_NLFH_OPENRD = 0x0400;
@@ -103,7 +108,13 @@
const u_int32_t jerrno::JERR_EMAP_DUPLICATE = 0x0b00;
const u_int32_t jerrno::JERR_EMAP_NOTFOUND = 0x0b01;
+// class jinf
+const u_int32_t jerrno::JERR_JINF_CVALIDFAIL = 0x0c00;
+const u_int32_t jerrno::JERR_JINF_NOVALUESTR = 0x0c01;
+const u_int32_t jerrno::JERR_JINF_BADVALUESTR = 0x0c02;
+const u_int32_t jerrno::JERR_JINF_JDATEMPTY = 0x0c03;
+
bool
jerrno::__init()
{
@@ -117,15 +128,24 @@
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = std::string("JERR_JCNTL_STOPPED: Operation on
stopped journal.");
+ _err_map[JERR_JCNTL_READONLY] = std::string("JERR_JCNTL_READONLY: "
+ "Write operation on read-only journal (during recovery).");
_err_map[JERR_JCNTL_AIOCMPLWAIT] = std::string("JERR_JCNTL_AIOCMPLWAIT: "
"Timeout waiting for AIOs to complete.");
+ _err_map[JERR_JCNTL_UNKNOWNMAGIC] = std::string("JERR_JCNTL_UNKNOWNMAGIC:
"
+ "Found record with unknown magic.");
+ _err_map[JERR_JCNTL_NOTRECOVERED] = std::string("JERR_JCNTL_NOTRECOVERED:
"
+ "Operation requires recover() to be run first.");
// class jdir
+ _err_map[JERR_JDIR_NOTDIR] = std::string("JERR_JDIR_NOTDIR: "
+ "Directory name exists but is not a directory.");
_err_map[JERR_JDIR_MKDIR] = std::string("JERR_JDIR_MKDIR: Directory creation
failed.");
_err_map[JERR_JDIR_OPENDIR] = std::string("JERR_JDIR_OPENDIR: Directory open
failed.");
_err_map[JERR_JDIR_READDIR] = std::string("JERR_JDIR_READDIR: Directory read
failed.");
_err_map[JERR_JDIR_CLOSEDIR] = std::string("JERR_JDIR_CLOSEDIR: Directory close
failed.");
_err_map[JERR_JDIR_RMDIR] = std::string("JERR_JDIR_RMDIR: Directory delete
failed.");
+ _err_map[JERR_JDIR_NOSUCHFILE] = std::string("JERR_JDIR_NOSUCHFILE: File does
not exist.");
_err_map[JERR_JDIR_FMOVE] = std::string("JERR_JDIR_FMOVE: File move
failed.");
_err_map[JERR_JDIR_STAT] = std::string("JERR_JDIR_STAT: File stat
failed.");
_err_map[JERR_JDIR_UNLINK] = std::string("JERR_JDIR_UNLINK: File delete
failed.");
@@ -184,6 +204,15 @@
_err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: "
"Key not found in enqueue map.");
+ // class jinf
+ _err_map[JERR_JINF_CVALIDFAIL] = std::string("JERR_JINF_CVALIDFAIL: "
+ "Compatibility validation failure.");
+ _err_map[JERR_JINF_NOVALUESTR] = std::string("JERR_JINF_NOVALUESTR: "
+ "No value attribute found in jinf file");
+ _err_map[JERR_JINF_BADVALUESTR] = std::string("JERR_JINF_BADVALUESTR: "
+ "Bad format for value attribute in jinf file");
+ _err_map[JERR_JINF_JDATEMPTY] = std::string("JERR_JINF_JDATEMPTY: Journal data
files empty.");
+
//_err_map[] = std::string("");
return true;
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -70,14 +70,19 @@
// class jcntl
static const u_int32_t JERR_JCNTL_STOPPED; ///< Operation on stopped
journal
+ static const u_int32_t JERR_JCNTL_READONLY; ///< Write operation on
read-only journal
static const u_int32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs
to complete
+ static const u_int32_t JERR_JCNTL_UNKNOWNMAGIC; ///< Found record with unknown
magic
+ static const u_int32_t JERR_JCNTL_NOTRECOVERED; ///< Req' recover() to be
called first
// class jdir
+ static const u_int32_t JERR_JDIR_NOTDIR; ///< Exists but is not a
directory
static const u_int32_t JERR_JDIR_MKDIR; ///< Directory creation
failed
static const u_int32_t JERR_JDIR_OPENDIR; ///< Directory open failed
static const u_int32_t JERR_JDIR_READDIR; ///< Directory read failed
static const u_int32_t JERR_JDIR_CLOSEDIR; ///< Directory close failed
static const u_int32_t JERR_JDIR_RMDIR; ///< Directory delete failed
+ static const u_int32_t JERR_JDIR_NOSUCHFILE; ///< File does not exist
static const u_int32_t JERR_JDIR_FMOVE; ///< File move failed
static const u_int32_t JERR_JDIR_STAT; ///< File stat failed
static const u_int32_t JERR_JDIR_UNLINK; ///< File delete failed
@@ -120,6 +125,11 @@
static const u_int32_t JERR_EMAP_DUPLICATE; ///< Attempted to insert using
duplicate key
static const u_int32_t JERR_EMAP_NOTFOUND; ///< Key not found in map
+ // class jinf
+ static const u_int32_t JERR_JINF_CVALIDFAIL; ///< Compatibility validation
failure
+ static const u_int32_t JERR_JINF_NOVALUESTR; ///< No value attr found in
jinf file
+ static const u_int32_t JERR_JINF_BADVALUESTR; ///< Bad format for value attr
in jinf file
+ static const u_int32_t JERR_JINF_JDATEMPTY; ///< Journal data files empty
/**
* \brief Method to access error message from known error number.
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -33,39 +33,202 @@
#include <jrnl/jinf.hpp>
+#include <fstream>
#include <jrnl/jcntl.hpp>
-//#include <errno.h>
-//#include <iomanip>
+#include <jrnl/jerrno.hpp>
#include <sstream>
-//#include <jrnl/jcntl.hpp>
namespace rhm
{
namespace journal
{
+jinf::jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception):
+ _jver(0),
+ _num_files(0),
+ _fsize_sblks(0),
+ _sblk_size_dblks(0),
+ _dblk_size(0),
+ _wmgr_page_size_dblks(0),
+ _wmgr_num_pages(0),
+ _rmgr_page_size_dblks(0),
+ _rmgr_num_pages(0),
+ _tm_ptr(NULL),
+ _valid_flag(false),
+ _analyzed_flag(false),
+ _start_file(0)
+{
+ read(jinf_filename);
+ if (validate_flag)
+ validate();
+}
+
jinf::jinf(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
const timespec& ts):
+ _jver(RHM_JDAT_VERSION),
_jid(jid),
_jdir(jdir),
_base_filename(base_filename),
_ts(ts),
- _tm_ptr(::localtime(&ts.tv_sec))
+ _num_files(JRNL_NUM_FILES),
+ _fsize_sblks(JRNL_FILE_SIZE),
+ _sblk_size_dblks(JRNL_SBLK_SIZE),
+ _dblk_size(JRNL_DBLK_SIZE),
+ _wmgr_page_size_dblks(JRNL_WMGR_PAGE_SIZE),
+ _wmgr_num_pages(JRNL_WMGR_PAGES),
+ _rmgr_page_size_dblks(JRNL_RMGR_PAGE_SIZE),
+ _rmgr_num_pages(JRNL_RMGR_PAGES),
+ _tm_ptr(::localtime(&ts.tv_sec)),
+ _valid_flag(false),
+ _analyzed_flag(false),
+ _start_file(0)
{}
jinf::~jinf()
{}
+void
+jinf::validate() throw (jexception)
+{
+ bool err = false;
+ std::stringstream ss;
+ if (_jver != RHM_JDAT_VERSION)
+ {
+ ss << "RHM_JDAT_VERSION mismatch: " << _jver;
+ ss << "; required=" << RHM_JDAT_VERSION <<
std::endl;
+ err = true;
+ }
+ if (_num_files != JRNL_NUM_FILES)
+ {
+ ss << "JRNL_NUM_FILES mismatch: " << _num_files;
+ ss << "; required=" << JRNL_NUM_FILES << std::endl;
+ err = true;
+ }
+ if (_fsize_sblks != JRNL_FILE_SIZE)
+ {
+ ss << "JRNL_FILE_SIZE mismatch: " << _fsize_sblks;
+ ss << "; required=" << JRNL_FILE_SIZE << std::endl;
+ err = true;
+ }
+ if (_sblk_size_dblks != JRNL_SBLK_SIZE)
+ {
+ ss << "JRNL_SBLK_SIZE mismatch: " << _sblk_size_dblks;
+ ss << "; required=" << JRNL_SBLK_SIZE << std::endl;
+ err = true;
+ }
+ if (_dblk_size != JRNL_DBLK_SIZE)
+ {
+ ss << "JRNL_DBLK_SIZE mismatch: " << _dblk_size;
+ ss << "; required=" << JRNL_DBLK_SIZE << std::endl;
+ err = true;
+ }
+ if (_wmgr_page_size_dblks != JRNL_WMGR_PAGE_SIZE)
+ {
+ ss << "JRNL_WMGR_PAGE_SIZE mismatch: " <<
_wmgr_page_size_dblks;
+ ss << "; required=" << JRNL_WMGR_PAGE_SIZE <<
std::endl;
+ err = true;
+ }
+ if (_wmgr_num_pages != JRNL_WMGR_PAGES)
+ {
+ ss << "JRNL_WMGR_PAGES mismatch: " << _wmgr_num_pages;
+ ss << "; required=" << JRNL_WMGR_PAGES << std::endl;
+ err = true;
+ }
+ if (_rmgr_page_size_dblks != JRNL_RMGR_PAGE_SIZE)
+ {
+ ss << "JRNL_RMGR_PAGE_SIZE mismatch: " <<
_rmgr_page_size_dblks;
+ ss << "; required=" << JRNL_RMGR_PAGE_SIZE <<
std::endl;
+ err = true;
+ }
+ if (_rmgr_num_pages != JRNL_RMGR_PAGES)
+ {
+ ss << "JRNL_RMGR_PAGES mismatch: " << _rmgr_num_pages;
+ ss << "; required=" << JRNL_RMGR_PAGES << std::endl;
+ err = true;
+ }
+ if (err)
+ throw jexception(jerrno::JERR_JINF_CVALIDFAIL, ss.str(), "jinf",
"validate");
+ _valid_flag = true;
+}
+
+const u_int16_t
+jinf::analyze() throw (jexception)
+{
+ u_int16_t fid = 0xffff;
+ u_int64_t rid = (u_int64_t)-1; // TODO: check this, 64-bit literals are a problem!
+
+ if (!_valid_flag)
+ validate();
+ for (u_int16_t fnum=0; fnum < _num_files; fnum++)
+ {
+ std::stringstream ss;
+ ss << _jdir << "/" << _base_filename <<
".";
+ ss << std::setw(4) << std::setfill('0') << std::hex
<< fnum;
+ ss << "." << JRNL_DATA_EXTENSION;
+ std::ifstream jifs(ss.str().c_str());
+ if (!jifs.good())
+ throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf",
"analyze");
+ file_hdr fhdr;
+ jifs.read((char*)&fhdr, sizeof(fhdr));
+ if (fhdr._hdr._magic != RHM_JDAT_FILE_MAGIC)
+ break;
+ if (fhdr._hdr._rid < rid)
+ {
+ fid = fnum;
+ rid = fhdr._hdr._rid;
+ }
+ jifs.close();
+ }
+ if (fid == (u_int16_t)-1)
+ throw jexception(jerrno::JERR_JINF_JDATEMPTY, "jinf",
"analyze");
+ _start_file = fid;
+ return _start_file;
+}
+
+const u_int16_t
+jinf::get_start_file() throw (jexception)
+{
+ if (!_analyzed_flag)
+ analyze();
+ return _start_file;
+}
+
const std::string&
-jinf::str(std::string& s) const
+jinf::to_string(std::string& s) const
{
std::stringstream ss;
+ ss << std::setfill('0');
+ ss << "Journal ID \"" << _jid << "\"
initialized " << (_tm_ptr->tm_year + 1900) << "/";
+ ss << std::setw(2) << (_tm_ptr->tm_mon + 1) << "/"
<< std::setw(2) << _tm_ptr->tm_mday << " ";
+ ss << std::setw(2) << _tm_ptr->tm_hour << ":" <<
std::setw(2) << _tm_ptr->tm_min << ":";
+ ss << std::setw(2) << _tm_ptr->tm_sec << "." <<
std::setw(9) << _ts.tv_nsec << ":" << std::endl;
+ ss << " Journal directory: \"" << _jdir <<
"\"" << std::endl;
+ ss << " Journal base filename: \"" << _base_filename
<< "\"" << std::endl;
+ ss << " Journal version: " << (unsigned)_jver <<
std::endl;
+ ss << " Number of journal files (JRNL_NUM_FILES): " <<
_num_files << std::endl;
+ ss << " Journal file size (JRNL_FILE_SIZE): " << _fsize_sblks
<< " sblks" << std::endl;
+ ss << " Softblock size (JRNL_SBLK_SIZE): " << _sblk_size_dblks
<< " dblks" << std::endl;
+ ss << " Datablock size (JRNL_DBLK_SIZE): " << _dblk_size
<< " bytes" << std::endl;
+ ss << " Write page size (JRNL_WMGR_PAGE_SIZE): " <<
_wmgr_page_size_dblks << " dblks" <<
+ std::endl;
+ ss << " Number of write pages (JRNL_WMGR_PAGES): " <<
_wmgr_num_pages << std::endl;
+ ss << " Read page size (JRNL_RMGR_PAGE_SIZE): " <<
_rmgr_page_size_dblks << " dblks" <<
+ std::endl;
+ ss << " Number of read pages (JRNL_RMGR_PAGES): " <<
_rmgr_num_pages << std::endl;
+ s.assign(ss.str());
+ return s;
+}
+
+const std::string&
+jinf::xml_str(std::string& s) const
+{
+ std::stringstream ss;
// TODO: I'm sure a more elegant way can be found to do this, but direct and
simple
// seems like a good start!
ss << std::setfill('0');
ss << "<?xml version=\"1.0\"?>" << std::endl;
ss << "<jrnl>" << std::endl;
- ss << " <journal_version value=\"" << RHM_JDAT_VERSION
<< "\"/>" << std::endl;
+ ss << " <journal_version value=\"" << (unsigned)_jver
<< "\"/>" << std::endl;
ss << " <journal_id>" << std::endl;
ss << " <id_string value=\"" << _jid <<
"\" />" << std::endl;
ss << " <directory value=\"" << _jdir <<
"\" />" << std::endl;
@@ -81,19 +244,103 @@
ss << "\" />" << std::endl;
ss << " </creation_time>" << std::endl;
ss << " <journal_file_geometry>" << std::endl;
- ss << " <JRNL_NUM_FILES value=\"" << JRNL_NUM_FILES
<< "\" />" << std::endl;
- ss << " <JRNL_FILE_SIZE value=\"" << JRNL_FILE_SIZE
<< "\" />" << std::endl;
- ss << " <JRNL_SBLK_SIZE value=\"" << JRNL_SBLK_SIZE
<< "\" />" << std::endl;
- ss << " <JRNL_DBLK_SIZE value=\"" << JRNL_DBLK_SIZE
<< "\" />" << std::endl;
- ss << " <JRNL_WMGR_PAGE_SIZE value=\"" <<
JRNL_WMGR_PAGE_SIZE << "\" />" << std::endl;
- ss << " <JRNL_WMGR_PAGES value=\"" <<
JRNL_WMGR_PAGES << "\" />" << std::endl;
- ss << " <JRNL_RMGR_PAGE_SIZE value=\"" <<
JRNL_RMGR_PAGE_SIZE << "\" />" << std::endl;
- ss << " <JRNL_RMGR_PAGES value=\"" <<
JRNL_RMGR_PAGES << "\" />" << std::endl;
+ ss << " <JRNL_NUM_FILES value=\"" << _num_files
<< "\" />" << std::endl;
+ ss << " <JRNL_FILE_SIZE value=\"" << _fsize_sblks
<< "\" />" << std::endl;
+ ss << " <JRNL_SBLK_SIZE value=\"" <<
_sblk_size_dblks << "\" />" << std::endl;
+ ss << " <JRNL_DBLK_SIZE value=\"" << _dblk_size
<< "\" />" << std::endl;
+ ss << " <JRNL_WMGR_PAGE_SIZE value=\"" <<
_wmgr_page_size_dblks << "\" />" << std::endl;
+ ss << " <JRNL_WMGR_PAGES value=\"" <<
_wmgr_num_pages << "\" />" << std::endl;
+ ss << " <JRNL_RMGR_PAGE_SIZE value=\"" <<
_rmgr_page_size_dblks << "\" />" << std::endl;
+ ss << " <JRNL_RMGR_PAGES value=\"" <<
_rmgr_num_pages << "\" />" << std::endl;
ss << " </journal_file_geometry>" << std::endl;
ss << "</jrnl>" << std::endl;
s.assign(ss.str());
return s;
}
+void
+jinf::read(const std::string& jinf_filename) throw (jexception)
+{
+ // FIXME: This is *not* an XML reader, rather for simplicity, it is a brute-force
+ // line reader which relies on string recognition.
+
+ char buff[1024];
+ std::ifstream jinfs(jinf_filename.c_str());
+ if (!jinfs.good())
+ throw jexception(jerrno::JERR__FILEIO, jinf_filename, "jinf",
"read");
+ while (jinfs.good())
+ {
+ jinfs.getline(buff, 1024);
+ if (::strstr(buff, "journal_version"))
+ _jver = u_int16_value(buff);
+ else if(::strstr(buff, "id_string"))
+ string_value(_jid, buff);
+ else if(::strstr(buff, "directory"))
+ string_value(_jdir, buff);
+ else if(::strstr(buff, "base_filename"))
+ string_value(_base_filename, buff);
+ else if(::strstr(buff, "JRNL_NUM_FILES"))
+ _num_files = u_int16_value(buff);
+ else if(::strstr(buff, "JRNL_FILE_SIZE"))
+ _fsize_sblks = u_int32_value(buff);
+ else if(::strstr(buff, "JRNL_SBLK_SIZE"))
+ _sblk_size_dblks = u_int16_value(buff);
+ else if(::strstr(buff, "JRNL_DBLK_SIZE"))
+ _dblk_size = u_int32_value(buff);
+ else if(::strstr(buff, "JRNL_WMGR_PAGE_SIZE"))
+ _wmgr_page_size_dblks = u_int32_value(buff);
+ else if(::strstr(buff, "JRNL_WMGR_PAGES"))
+ _wmgr_num_pages = u_int32_value(buff);
+ else if(::strstr(buff, "JRNL_RMGR_PAGE_SIZE"))
+ _rmgr_page_size_dblks = u_int32_value(buff);
+ else if(::strstr(buff, "JRNL_RMGR_PAGES"))
+ _rmgr_num_pages = u_int32_value(buff);
+ else if(::strstr(buff, "seconds"))
+ {
+ _ts.tv_sec = u_int32_value(buff);
+ _tm_ptr = ::localtime(&_ts.tv_sec);
+ }
+ else if(::strstr(buff, "nanoseconds"))
+ _ts.tv_nsec = u_int32_value(buff);
+ }
+ jinfs.close();
+}
+
+const u_int16_t
+jinf::u_int16_value(char* line) const throw (jexception)
+{
+ return ::atoi(find_value(line));
+}
+
+const u_int32_t
+jinf::u_int32_value(char* line) const throw (jexception)
+{
+ return ::atol(find_value(line));
+}
+
+const std::string&
+jinf::string_value(std::string& str, char* line) const throw (jexception)
+{
+ str.assign(find_value(line));
+ return str;
+}
+
+const char*
+jinf::find_value(char* line) const throw (jexception)
+{
+ const char* target1_str = "value=\"";
+ int target2_char = '\"';
+ char* t1 = ::strstr(line, target1_str);
+ if (t1 == NULL)
+ throw jexception(jerrno::JERR_JINF_NOVALUESTR, line, "jinf",
"find_value");
+ t1 += ::strlen(target1_str);
+
+ char* t2 = ::strchr(t1, target2_char);
+ if (t2 == NULL)
+ throw jexception(jerrno::JERR_JINF_BADVALUESTR, line, "jinf",
"find_value");
+ *t2 = '\0';
+ return t1;
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -47,18 +47,57 @@
class jinf
{
private:
- const std::string& _jid;
- const std::string& _jdir;
- const std::string& _base_filename;
- const timespec& _ts;
+ u_int8_t _jver;
+ std::string _jid;
+ std::string _jdir;
+ std::string _base_filename;
+ timespec _ts;
+ u_int16_t _num_files;
+ u_int32_t _fsize_sblks;
+ u_int16_t _sblk_size_dblks;
+ u_int32_t _dblk_size;
+ u_int32_t _wmgr_page_size_dblks;
+ u_int32_t _wmgr_num_pages;
+ u_int32_t _rmgr_page_size_dblks;
+ u_int32_t _rmgr_num_pages;
tm* _tm_ptr;
+ bool _valid_flag;
+ bool _analyzed_flag;
+ u_int16_t _start_file;
public:
+ jinf(const std::string& jinf_filename, bool validate_flag) throw
(jexception);
jinf(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
const timespec& ts);
~jinf();
- const std::string& str(std::string& ss) const;
+ void validate() throw (jexception);
+ const u_int16_t analyze() throw (jexception);
+
+ inline const u_int8_t jver() const { return _jver; }
+ inline const std::string& jid() const { return _jid; }
+ inline const std::string& jdir() const { return _jdir; }
+ inline const std::string& base_filename() const { return _base_filename; }
+ inline const timespec& ts() const { return _ts; }
+ inline const u_int16_t num_files() const { return _num_files; }
+ inline const u_int32_t fsize_sblks() const { return _fsize_sblks; }
+ inline const u_int16_t sblk_size_dblks() const { return _sblk_size_dblks; }
+ inline const u_int32_t dblk_size() const { return _dblk_size; }
+ inline const u_int32_t wmgr_page_size_dblks() const { return
_wmgr_page_size_dblks; }
+ inline const u_int32_t wmgr_num_pages() const { return _wmgr_num_pages; }
+ inline const u_int32_t rmgr_page_size_dblks() const { return
_rmgr_page_size_dblks; }
+ inline const u_int32_t rmgr_num_pages() const { return _rmgr_num_pages; }
+ const u_int16_t get_start_file() throw (jexception);
+
+ const std::string& to_string(std::string& s) const;
+ const std::string& xml_str(std::string& s) const;
+
+ private:
+ void read(const std::string& jinf_filename) throw (jexception);
+ const u_int16_t u_int16_value(char* line) const throw (jexception);
+ const u_int32_t u_int32_value(char* line) const throw (jexception);
+ const std::string& string_value(std::string& str, char* line) const throw
(jexception);
+ const char* find_value(char* line) const throw (jexception);
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -42,8 +42,9 @@
nlfh()
{}
-lfh::lfh(const std::string& fbasename, const u_int16_t fid) throw (jexception*):
- nlfh(fbasename, fid)
+lfh::lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const ro)
+ throw (jexception*):
+ nlfh(fbasename, fid, ro)
{}
lfh::~lfh()
Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -56,7 +56,8 @@
{
public:
lfh();
- lfh(const std::string& fbasename, const u_int16_t fid) throw (jexception*);
+ lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const
ro)
+ throw (jexception*);
virtual ~lfh();
};
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -62,7 +62,8 @@
#endif
{}
-nlfh::nlfh(const std::string& fbasename, const u_int16_t fid) throw (jexception):
+nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const
ro)
+ throw (jexception):
_fname(),
_fid(fid),
_rd_fh(-1),
@@ -78,7 +79,7 @@
_wr_cmpl_cnt_dblks(0)
#endif
{
- initialize(fbasename, fid);
+ initialize(fbasename, fid, ro);
open();
}
@@ -88,7 +89,8 @@
}
void
-nlfh::initialize(const std::string& fbasename, const u_int16_t fid) throw
(jexception)
+nlfh::initialize(const std::string& fbasename, const u_int16_t fid,
+ const rcvdat* const ro) throw (jexception)
{
std::stringstream ss;
@@ -104,53 +106,78 @@
if (::stat(_fname.c_str(), &s))
{
#endif
- const size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
- const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
-
- // NOTE: The journal file size is always one sblock bigger than the specified
journal file
- // size (JRNL_FILE_SIZE in jcfg.hpp), which is the data content size. The extra
block is
- // for the journal file header which precedes all data on each file and is
exactly one
- // softblock in size.
- u_int32_t nsblks = JRNL_FILE_SIZE + 1;
- void* nullbuf = NULL;
- if (::posix_memalign(&nullbuf, sblksize, writesize))
+ if (ro)
{
- ss << ": posix_memalign() failed: size=" << writesize
<< " blk_size=" << sblksize;
- ss << " errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str(), "nlfh",
"initialize");
+ if (!ro->_empty)
+ {
+ if (ro->_ffid == _fid)
+ {
+ _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ }
+ if (ro->_lfid == _fid)
+ {
+ _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
+ _wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
+ }
+ else
+ {
+ _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ }
+ _rec_enqcnt = ro->_enq_cnt_list[_fid];
+ }
}
- ::memset(nullbuf, 0, writesize);
-
- int fh = ::open(_fname.c_str(), O_WRONLY | O_CREAT | O_DIRECT,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
- if (fh < 0)
+ else
{
- ss << ": open() failed:" << "\" errno="
<< errno;
- ::free(nullbuf);
- throw jexception(jerrno::JERR_NLFH_OPENWR, ss.str(), "nlfh",
"initialize");
- }
+ const size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+ const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
- while (nsblks > 0)
- {
- u_int32_t this_write_sblks = nsblks >= JRNL_WMGR_PAGE_SIZE ?
- JRNL_WMGR_PAGE_SIZE : nsblks;
- if (::write(fh, (void*)nullbuf, this_write_sblks * sblksize) == -1)
+ // NOTE: The journal file size is always one sblock bigger than the specified
journal
+ // file size (JRNL_FILE_SIZE in jcfg.hpp), which is the data content size.
The extra
+ // block is for the journal file header which precedes all data on each file
and is
+ // exactly one softblock in size.
+ u_int32_t nsblks = JRNL_FILE_SIZE + 1;
+ void* nullbuf = NULL;
+ if (::posix_memalign(&nullbuf, sblksize, writesize))
{
- std::stringstream ss;
- ss << ": wr_size=" << (this_write_sblks * sblksize)
<< " errno=" << errno;
- ::close(fh);
+ ss << ": posix_memalign() failed: size=" <<
writesize << " blk_size=" << sblksize;
+ ss << " errno=" << errno;
+ throw jexception(jerrno::JERR__MALLOC, ss.str(), "nlfh",
"initialize");
+ }
+ ::memset(nullbuf, 0, writesize);
+
+ int fh = ::open(_fname.c_str(), O_WRONLY | O_CREAT | O_DIRECT,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
+ if (fh < 0)
+ {
+ ss << ": open() failed:" << "\"
errno=" << errno;
::free(nullbuf);
- throw jexception(jerrno::JERR_NLFH_WRITE, ss.str(), "nlfh",
"initialize");
+ throw jexception(jerrno::JERR_NLFH_OPENWR, ss.str(), "nlfh",
"initialize");
}
- nsblks -= this_write_sblks;
- }
- // Clean up
- ::free(nullbuf);
- if (::close(fh))
- {
- ss << ": errno=" << errno;
- throw jexception(jerrno::JERR_NLFH_CLOSE, ss.str(), "jcntl",
"initialize");
+ while (nsblks > 0)
+ {
+ u_int32_t this_write_sblks = nsblks >= JRNL_WMGR_PAGE_SIZE ?
+ JRNL_WMGR_PAGE_SIZE : nsblks;
+ if (::write(fh, (void*)nullbuf, this_write_sblks * sblksize) == -1)
+ {
+ std::stringstream ss;
+ ss << ": wr_size=" << (this_write_sblks *
sblksize) << " errno=" << errno;
+ ::close(fh);
+ ::free(nullbuf);
+ throw jexception(jerrno::JERR_NLFH_WRITE, ss.str(), "nlfh",
"initialize");
+ }
+ nsblks -= this_write_sblks;
+ }
+
+ // Clean up
+ ::free(nullbuf);
+ if (::close(fh))
+ {
+ ss << ": errno=" << errno;
+ throw jexception(jerrno::JERR_NLFH_CLOSE, ss.str(), "jcntl",
"initialize");
+ }
}
#ifdef RHM_JOWRITE
}
@@ -158,8 +185,31 @@
}
bool
-nlfh::reset()
+nlfh::reset(const rcvdat* const ro)
{
+ if (ro)
+ {
+ if (!ro->_empty)
+ {
+ if (ro->_ffid == _fid)
+ {
+ _rd_subm_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ _rd_cmpl_cnt_dblks = ro->_fro/JRNL_DBLK_SIZE;
+ }
+ if (ro->_lfid == _fid)
+ {
+ _wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
+ _wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
+ }
+ else
+ {
+ _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ }
+ _rec_enqcnt = ro->_enq_cnt_list[_fid];
+ return true;
+ }
+ }
#ifndef RHM_WRONLY
// Journal overflow test - checks if the file to be reset still contains enqueued
records
if (_rec_enqcnt)
@@ -316,10 +366,15 @@
return _wr_cmpl_cnt_dblks;
}
-const bool
-nlfh::will_fit(const size_t rec_size) const
+// Debug function
+const std::string&
+nlfh::status_str(std::string& s) const
{
- return rec_size <= wr_remaining_dblks() * JRNL_DBLK_SIZE;
+ std::stringstream ss;
+ ss << "nlfh[" << _fid << "]: ws=" <<
_wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
+ ss << " rs=" << _rd_subm_cnt_dblks << " rc="
<< _rd_cmpl_cnt_dblks;
+ s.assign(ss.str());
+ return s;
}
// Private functions
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -44,6 +44,7 @@
#include <string>
#include <jrnl/file_hdr.hpp>
+#include <jrnl/rcvdat.hpp>
namespace rhm
{
@@ -71,12 +72,13 @@
public:
nlfh();
// Constructors with implicit initialize() and open()
- nlfh(const std::string& fbasename, const u_int16_t fid) throw (jexception);
+ nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const
ro)
+ throw (jexception);
virtual ~nlfh();
- virtual void initialize(const std::string& fbasename, const u_int16_t fid)
- throw (jexception);
- virtual bool reset(); ///< Reset file handle pointers prior to
reuse
+ virtual void initialize(const std::string& fbasename, const u_int16_t fid,
+ const rcvdat* const ro) throw (jexception);
+ virtual bool reset(const rcvdat* const ro = NULL);
inline const std::string& fname() const { return _fname; }
inline const u_int16_t fid() const { return _fid; }
@@ -129,7 +131,8 @@
{ return _wr_subm_cnt_dblks - _wr_cmpl_cnt_dblks; }
inline const bool wr_file_rotate() const { return is_wr_full(); }
- const bool will_fit(const size_t rec_size) const;
+ // Debug aid
+ const std::string& status_str(std::string& s) const;
protected:
virtual void open() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -131,7 +131,7 @@
_pg_offset_dblks = 0;
_aio_evt_rem = 0;
clean();
- _emap.clear();
+// _emap.clear();
// 1. Allocate page memory (as a single block)
size_t pagesize = _pages * _pagesize * _sblksize;
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -60,7 +60,7 @@
*/
enum _iores
{
- RHM_IORES_SUCCESS, ///< Success: IO operation completed noramlly.
+ RHM_IORES_SUCCESS = 0, ///< Success: IO operation completed noramlly.
RHM_IORES_AIO_WAIT, ///< IO operation suspended as all pages in cache are
waiting for AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is available
to read.
RHM_IORES_FULL, ///< During write operations, the journal files are full.
Added: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp (rev 0)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -0,0 +1,74 @@
+/**
+* \file rcvdat.hpp
+*
+* Red Hat Messaging - Message Journal
+*
+* Contains structure for recovery status and offset data.
+*
+* \author Kim van der Riet
+*
+* Copyright 2007 Red Hat, Inc.
+*
+* This file is part of Red Hat Messaging.
+*
+* Red Hat Messaging is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License as published by the Free Software Foundation; either
+* version 2.1 of the License, or (at your option) any later version.
+*
+* This library is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this library; if not, write to the Free Software
+* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+* USA
+*
+* The GNU Lesser General Public License is available in the file COPYING.
+*/
+
+#ifndef rhm_journal_rcvdat_hpp
+#define rhm_journal_rcvdat_hpp
+
+#include <vector>
+
+namespace rhm
+{
+namespace journal
+{
+
+ struct rcvdat
+ {
+ bool _empty; ///< Journal data files empty
+ u_int16_t _ffid; ///< First file id
+ size_t _fro; ///< First record offset in ffid
+ u_int16_t _lfid; ///< Last file id
+ size_t _eo; ///< End offset (first byte past last record)
+ u_int64_t _h_rid; ///< Highest rid found
+ std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records
found for each file
+ rcvdat():
+ _empty(true),
+ _ffid(0),
+ _fro(0),
+ _lfid(0),
+ _eo(0),
+ _h_rid(0),
+ _enq_cnt_list(JRNL_NUM_FILES)
+ {}
+ void reset()
+ {
+ _empty=true;
+ _ffid=0;
+ _fro=0;
+ _lfid=0;
+ _eo=0;
+ _h_rid=0;
+ _enq_cnt_list.clear();
+ }
+ };
+}
+}
+
+#endif
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -72,7 +72,8 @@
if (_aio_evt_rem)
get_events();
//std::cout << " [a pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << "]" << std::flush;
- if(dblks_rem() == 0 && _rrfc.is_full())
+// if(dblks_rem() == 0 && _rrfc.is_full())
+ if(dblks_rem() == 0 && _rrfc.is_compl())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
@@ -109,7 +110,8 @@
while (true)
{
//std::cout << " [f pi=" << _pg_index << " d="
<< dblks_rem() << " f=" <<
(_rrfc.empty()?"T":"F") << "]" << std::flush;
- if(dblks_rem() == 0 && _rrfc.is_full())
+// if(dblks_rem() == 0 && _rrfc.is_full())
+ if(dblks_rem() == 0 && _rrfc.is_compl())
{
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
@@ -138,13 +140,16 @@
bool is_enq = false;
try
{
+//std::cout << " ?" << _hdr._rid << std::flush;
fid = _emap.get_fid(_hdr._rid);
+//std::cout << "-ok" << std::flush;
is_enq = true;
}
catch (jexception& e)
{
if (e.err_code() != jerrno::JERR_EMAP_NOTFOUND)
throw e;
+//std::cout << "-nf" << std::flush;
}
#endif
if (is_enq) // ok, this record is enqueued, check it, then read it...
@@ -440,6 +445,8 @@
u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks :
file_rem_dblks;
if (rd_size)
{
+//std::string s;
+//std::cout << " (" << _rrfc.file_handle()->status(s) <<
")";
//std::cout << " <frd=" << file_rem_dblks <<
">" << std::flush;
int16_t pi = (i + first_uninit) % _pages;
// TODO: For perf, combine contiguous pages into single read
@@ -453,7 +460,7 @@
_aio_evt_rem++;
_page_cb_arr[pi]._state = AIO_PENDING;
_page_cb_arr[pi]._rfh = _rrfc.file_handle();
-//std::cout << "{r^" << pi << "}" <<
std::flush;
+//std::cout << "{r^" << pi << "," << rd_size
<< "}" << std::flush;
}
else // If there is nothing to read for this page, neither will there be for the
others...
//{std::cout << "&" << std::flush;
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -124,8 +124,6 @@
inline const u_int32_t aio_outstanding_dblks()
{ return _curr_fh->rd_aio_outstanding_dblks(); }
inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
-
- inline const bool will_fit(const size_t size) const { return
_curr_fh->will_fit(size); }
}; // class rrfc
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -52,17 +52,32 @@
wrfc::~wrfc() {}
+// void
+// wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index, u_int64_t rid)
throw (jexception)
+// {
+// rrfc::initialize(nfiles, fh_arr, fh_index);
+// _rid = rid;
+// _reset_ok = false;
+// }
void
-wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index) throw (jexception)
+wrfc::initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp) throw (jexception)
{
- rrfc::initialize(nfiles, fh_arr, fh_index);
+ if (rdp)
+ {
+ rrfc::initialize(nfiles, fh_arr, rdp->_lfid);
+ _rid = rdp->_h_rid + 1;
+ _reset_ok = true;
+ }
+ else
+ {
+ rrfc::initialize(nfiles, fh_arr);
#ifdef DRHM_TESTVALS
- // TODO: Find method of specifying 64-bit literals under gcc with -pedantic option
- _rid = u_int64_t(0xffeeddcc) << 32;
+ _rid = u_int64_t(0xffeeddcc) << 32;
#else
- _rid = 0;
+ _rid = 0;
#endif
- _reset_ok = false;
+ _reset_ok = false;
+ }
}
bool
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -69,8 +69,14 @@
* each of which correspond to one of the physical files.
* \param fh_index Initial index of journal file. Default = 0.
*/
- void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0)
- throw (jexception);
+// void initialize(u_int32_t nfiles, nlfh** fh_arr, u_int32_t fh_index = 0,
+// #ifdef DRHM_TESTVALS
+// u_int64_t rid = u_int64_t(0xffeeddcc) << 32
+// #else
+// u_int64_t rid = 0
+// #endif
+// ) throw (jexception);
+ void initialize(u_int32_t nfiles, nlfh** fh_arr, rcvdat* rdp = NULL) throw
(jexception);
/**
* \brief Rotate active file handle to next file in rotating file group.
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-20 07:46:22 UTC (rev 929)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-20 20:35:17 UTC (rev 930)
@@ -68,12 +68,12 @@
msg_producer.o \
msg_consumer.o \
-JTEST_FILES = jtest
+JTEST_FILES = jtest jrnl_scope_test
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual
-Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers
-pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
-LDFLAGS = -lpthread -laio -lrt
+LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/qpid/cpp/src/.libs
.SUFFIXES:
.SUFFIXES: .cpp .o
@@ -104,6 +104,8 @@
jtest: $(JRNL_OBJ_FILES) $(JTEST_OBJ_FILES)
+jrnl_scope_test: $(JRNL_OBJ_FILES) jrnl_scope_test.cpp
+
jrtest: RHM_DEFINES = -DRHM_JOWRITE -DRHM_RDONLY -DRHM_TESTVALS
jrtest: $(JTEST_FILES)
@@ -119,6 +121,7 @@
clean:
@rm -f *.o
@rm -f jtest
+ @rm -f jrnl_scope_test
clean-data:
@rm -rf jdata
Added: store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp (rev 0)
+++ store/trunk/cpp/tests/jrnl/jrnl_scope_test.cpp 2007-09-20 20:35:17 UTC (rev 930)
@@ -0,0 +1,619 @@
+#include <iostream>
+#include <jrnl/data_tok.hpp>
+#include <jrnl/jcntl.hpp>
+#include <sstream>
+
+using namespace std;
+
+#define TEST_ITERATIONS 5
+#define NUM_TESTS 14
+#define NUM_MSGS 5
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000
+
+class jrnl_scope_test
+{
+ static const char* iores_str[];
+public:
+ jrnl_scope_test() {}
+ ~jrnl_scope_test() {}
+
+ int run()
+ {
+ int res;
+ for (int t=1; t<=NUM_TESTS; t++)
+ if ((res = run(t, TEST_ITERATIONS)))
+ break;
+ return res;
+ }
+
+ int run(int test_num, int num_iter)
+ {
+ try
+ {
+ switch(test_num)
+ {
+ case 1:
+ return test_1(num_iter);
+ case 2:
+ return test_2(num_iter);
+ case 3:
+ return test_3(num_iter);
+ case 4:
+ return test_4(num_iter);
+ case 5:
+ return test_5(num_iter);
+ case 6:
+ return test_6(num_iter);
+ case 7:
+ return test_7(num_iter);
+ case 8:
+ return test_8(num_iter);
+ case 9:
+ return test_9(num_iter);
+ case 10:
+ return test_10(num_iter);
+ case 11:
+ return test_11(num_iter);
+ case 12:
+ return test_12(num_iter);
+ case 13:
+ return test_13(num_iter);
+ case 14:
+ return test_14(num_iter);
+ default:
+ cout << " unknown test: " << test_num <<
endl;
+ return 1;
+ }
+ }
+ catch (rhm::journal::jexception& e)
+ {
+ cout << e << endl;
+ return 2;
+ }
+ }
+
+protected:
+ int test_1(int num_iter)
+ {
+ cout << " 1. instance: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ rhm::journal::jcntl jc("scope_test_01", "jdata",
"t01");
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_2(int num_iter)
+ {
+ cout << " 2. instance_ptr: ";
+ {
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_02", "jdata", "t02");
+ if (!jc)
+ {
+ cout << " failed, jc==NULL" << endl;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_3(int num_iter)
+ {
+ cout << " 3. init: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ rhm::journal::jcntl jc("scope_test_03", "jdata",
"t03");
+ jc.initialize();
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_4(int num_iter)
+ {
+ cout << " 4. init_ptr: ";
+ {
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_04", "jdata", "t04");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_5(int num_iter)
+ {
+ cout << " 5. init_recover: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_05", "jdata",
"t05");
+ jc.initialize();
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_05", "jdata",
"t05");
+ jc.recover();
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_05", "jdata",
"t05");
+ jc.recover();
+ jc.recovered();
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_6(int num_iter)
+ {
+ cout << " 6. init_recover_ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_06", "jdata", "t06");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_06", "jdata", "t06");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_06", "jdata", "t06");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ jc->recovered();
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_7(int num_iter)
+ {
+ cout << " 7. enq: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_07", "jdata",
"t07");
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ return 1;
+ }
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_8(int num_iter)
+ {
+ cout << " 8. enq_ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_08", "jdata", "t08");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ {
+ delete jc;
+ return 1;
+ }
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_9(int num_iter)
+ {
+ cout << " 9. recover_read: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_09", "jdata",
"t09");
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++) // 12288
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ return 1;
+ }
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_09", "jdata",
"t09");
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(&jc))
+ return 1;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_10(int num_iter)
+ {
+ cout << " 10. recover_read_ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_10", "jdata", "t10");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ {
+ delete jc;
+ return 1;
+ }
+ }
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_10", "jdata", "t10");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(jc))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_11(int num_iter)
+ {
+ cout << " 11. recover_read_read: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_11", "jdata",
"t11");
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ return 1;
+ }
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_11", "jdata",
"t11");
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(&jc))
+ return 1;
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(&jc))
+ return 1;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_12(int num_iter)
+ {
+ cout << " 12. recover_read_read_ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_12", "jdata", "t12");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ {
+ delete jc;
+ return 1;
+ }
+ }
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_12", "jdata", "t12");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(jc))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(jc))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_13(int num_iter)
+ {
+ cout << " 13. recover_read_read_deq: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl jc("scope_test_13", "jdata",
"t13");
+ jc.initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(&jc, ss.str().c_str(), ss.str().size()))
+ return 1;
+ }
+ }
+ {
+ rhm::journal::jcntl jc("scope_test_13", "jdata",
"t13");
+ jc.recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(&jc))
+ return 1;
+ jc.recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(&jc))
+ return 1;
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(&jc, m))
+ return 1;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int test_14(int num_iter)
+ {
+ cout << " 14. recover_read_read_deq_ptr: ";
+ for (int i=0; i<num_iter; i++)
+ {
+ cout << "." << flush;
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_14", "jdata", "t14");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->initialize();
+ for (int m=0; m<NUM_MSGS; m++)
+ {
+ std::stringstream ss;
+ ss << "Message " << m;
+ if (enq_msg(jc, ss.str().c_str(), ss.str().size()))
+ {
+ delete jc;
+ return 1;
+ }
+ }
+ delete jc;
+ }
+ {
+ rhm::journal::jcntl* jc = new
rhm::journal::jcntl("scope_test_14", "jdata", "t14");
+ if (!jc)
+ {
+ cout << "failed" << endl;
+ return 1;
+ }
+ jc->recover();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(jc))
+ {
+ delete jc;
+ return 1;
+ }
+ jc->recovered();
+ for (int m=0; m<NUM_MSGS; m++)
+ if (read_msg(jc))
+ {
+ delete jc;
+ return 1;
+ }
+ for (int m=0; m<NUM_MSGS; m++)
+ if (deq_msg(jc, m))
+ {
+ delete jc;
+ return 1;
+ }
+ delete jc;
+ }
+ }
+ cout << " ok" << endl;
+ return 0;
+ }
+
+ int enq_msg(rhm::journal::jcntl* jc, const char* msg, const size_t msg_size)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ rhm::journal::iores res = jc->enqueue_data(msg, msg_size, dtp);
+ if (res != rhm::journal::RHM_IORES_SUCCESS)
+ {
+ cout << "jcntl::enqueue_data() returned " <<
iores_str[res] << ". failed" << endl;
+ delete dtp;
+ return 1;
+ }
+ return 0;
+ }
+
+ int deq_msg(rhm::journal::jcntl* jc, u_int64_t drid)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ dtp->set_rid(drid);
+ rhm::journal::iores res = jc->dequeue_data(dtp);
+ if (res != rhm::journal::RHM_IORES_SUCCESS)
+ {
+ cout << "jcntl::dequeue_data() returned " <<
iores_str[res] << ". failed" << endl;
+ delete dtp;
+ return 1;
+ }
+ return 0;
+ }
+
+ int read_msg(rhm::journal::jcntl* jc)
+ {
+ char buff[128];
+ memset(buff, '?', 127);
+ buff[127] = '\0';
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+ dtp->set_wstate(rhm::journal::data_tok::ENQ);
+ unsigned aio_sleep_cnt = 0;
+ bool read = false;
+ while (!read)
+ {
+ rhm::journal::iores res = jc->read_data(buff, 127, dtp);
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ read = true;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+ {
+ cout << "Exceeded max count for RHM_IORES_AIO_WAIT.
failed" << endl;
+ delete dtp;
+ return 1;
+ }
+ ::usleep(AIO_SLEEP_TIME);
+ break;
+ default:
+ cout << "jcntl::read_data() returned " <<
iores_str[res] << ". failed" << endl;
+ delete dtp;
+ return 1;
+ }
+ }
+ return 0;
+ }
+};
+
+int main(int argc, char** argv)
+{
+ int test_num = 0;
+ int num_iter = TEST_ITERATIONS;
+ if (argc > 1)
+ test_num = atoi(argv[1]);
+ if (argc > 2)
+ num_iter = atoi(argv[2]);
+ if (num_iter < 1)
+ {
+ cout << "num_iter=" << num_iter << endl;
+ return 1;
+ }
+ cout << "jrnl_scope_test";
+ if (test_num)
+ cout << " test=" << test_num;
+ cout << ":" << endl;
+ jrnl_scope_test jst;
+ int res = test_num ? jst.run(test_num, num_iter) : jst.run();
+ if (res)
+ {
+ cout << "failed res=" << res << endl;
+ return res;
+ }
+ cout << "ok" << endl;
+ return 0;
+}
+
+const char* jrnl_scope_test::iores_str[] = {
+ "RHM_IORES_SUCCESS",
+ "RHM_IORES_AIO_WAIT",
+ "RHM_IORES_EMPTY",
+ "RHM_IORES_FULL",
+ "RHM_IORES_BUSY"
+ };
Modified: store/trunk/cpp/tests/jrnl/tests.ods
===================================================================
(Binary files differ)