Author: kpvdr
Date: 2007-11-30 17:57:44 -0500 (Fri, 30 Nov 2007)
New Revision: 1407
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jexception.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/lfh.cpp
store/trunk/cpp/lib/jrnl/lfh.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
Log:
The two #defines from jcfg.hpp JRNL_FILE_SIZE (the journal file size in sblks) and
JRNL_NUM_FILES are now parameterized as far as BdbMessageStore where they are defined as
consts. Hardwired mimima apply to both. Also some minor tidy-up around error handling for
::malloc().
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -60,6 +60,8 @@
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
prepareXidDb(&env, 0),
+ numJrnlFiles(8), // TODO -
make param
+ jrnlFsizeSblks(3072), // TODO -
make param
isInit(false),
envPath(envpath)
@@ -202,7 +204,7 @@
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
- JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout,
defJournalFlushTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
@@ -381,7 +383,7 @@
if (usingJrnl())
{
const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout,
defJournalFlushTimeout);
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try
@@ -982,8 +984,7 @@
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call
as option
break;
case rhm::journal::RHM_IORES_FULL:
-// Temporary error msg till exception handler core problem solved...
-std::cerr << "Error storing message -- Journal full on queue \""
<< queue->getName() << "\"" << std::endl <<
std::flush;
+ std::cerr << "Error storing message -- Journal full on
queue \"" << queue->getName() << "\"." <<
std::endl << std::flush;
THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full
:" + queue->getName());
break;
default:
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-30 22:57:44 UTC (rev 1407)
@@ -76,6 +76,8 @@
IdSequence messageIdSequence;
static bool useAsync;
std::string storeDir;
+ const u_int16_t numJrnlFiles;
+ const u_int32_t jrnlFsizeSblks;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -37,9 +37,11 @@
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
+ const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout):
- jcntl(journalId, journalDirectory, journalBaseFilename),
+ jcntl(journalId, journalDirectory, journalBaseFilename,
num_jfiles, jfsize_sblks),
getEventsTimerSetFlag(false),
writeActivityFlag(false),
flushTriggeredFlag(true),
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-30 22:57:44 UTC (rev 1407)
@@ -85,6 +85,8 @@
JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
+ const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout);
virtual ~JournalImpl();
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -59,7 +59,9 @@
{}
deq_rec::~deq_rec()
-{}
+{
+ clean();
+}
void
deq_rec::reset()
@@ -274,12 +276,7 @@
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(),
"deq_rec", "decode");
- }
+ MALLOC_CHK(_buff, "_buff", "deq_rec",
"decode");
const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() +
_deq_hdr._xidsize);
const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() +
_deq_hdr._xidsize +
rec_tail::size());
@@ -347,12 +344,7 @@
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(),
"deq_rec", "rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "deq_rec",
"rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _deq_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -434,5 +426,11 @@
jrec::chk_tail(_deq_tail, _deq_hdr._hdr);
}
+void
+deq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -92,6 +92,7 @@
virtual void chk_hdr() const throw (jexception);
virtual void chk_hdr(u_int64_t rid) const throw (jexception);
virtual void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class deq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -65,7 +65,9 @@
{}
enq_rec::~enq_rec()
-{}
+{
+ clean();
+}
// Prepare instance for use in reading data from journal, where buf contains preallocated
space
// to receive data.
@@ -354,12 +356,7 @@
if (_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize))
{
_buff = ::malloc(_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 :
_enq_hdr._dsize));
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(),
"enq_rec", "decode");
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec",
"decode");
const u_int32_t hdr_xid_dblks = size_dblks(enq_hdr::size() +
_enq_hdr._xidsize);
const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() +
_enq_hdr._xidsize +
@@ -468,12 +465,7 @@
if (_enq_hdr._xidsize)
{
_buff = ::malloc(_enq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(),
"enq_rec", "rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec",
"rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _enq_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -585,5 +577,11 @@
jrec::chk_tail(_enq_tail, _enq_hdr._hdr);
}
+void
+enq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -107,6 +107,7 @@
void chk_hdr() const throw (jexception);
void chk_hdr(u_int64_t rid) const throw (jexception);
void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class enq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -62,8 +62,11 @@
*/
#define JRNL_DBLK_SIZE 128 ///< Data block size in bytes (CANNOT BE
LESS THAN 32!)
#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of
JRNL_DBLK_SIZE
-#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks excl.
file_hdr
-#define JRNL_NUM_FILES 8 ///< Number of journal files
+#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl.
file_hdr)
+#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
+// The following are now parameterized in the jcntl constructor, and no longer needed
here.
+//#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks
excl. file_hdr
+//#define JRNL_NUM_FILES 8 ///< Number of journal files
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -41,6 +41,7 @@
#include <jrnl/jinf.hpp>
#include <sstream>
#include <qpid/broker/PersistableMessage.h>
+#include <unistd.h>
namespace rhm
{
@@ -49,7 +50,8 @@
// Functions
-jcntl::jcntl(const std::string& jid, const std::string& jdir, const
std::string& base_filename):
+jcntl::jcntl(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -57,13 +59,16 @@
_stop_flag(false),
_readonly_flag(false),
_autostop(true),
+ _num_jfiles(num_jfiles > JRNL_MIN_NUM_FILES ? num_jfiles : JRNL_MIN_NUM_FILES),
+ _jfsize_sblks(jfsize_sblks > JRNL_MIN_FILE_SIZE ? jfsize_sblks
:JRNL_MIN_FILE_SIZE),
_datafh(NULL),
_emap(),
_tmap(),
_rrfc(),
_wrfc(),
_rmgr(this, _emap, _tmap, _rrfc),
- _wmgr(this, _emap, _tmap, _wrfc)
+ _wmgr(this, _emap, _tmap, _wrfc),
+ _rcvdat(_num_jfiles)
{
pthread_mutex_init(&_mutex, NULL);
}
@@ -75,7 +80,7 @@
catch (const jexception& e) { std::cerr << e << std::endl; }
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
@@ -94,28 +99,28 @@
// TODO - place this in a finalize() fn? - see ~jcntl()...
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
}
- _datafh = ::new lfh*[JRNL_NUM_FILES];
+ _datafh = ::new lfh*[_num_jfiles];
// NULL the pointer array first because new() can throw exceptions
- ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ ::memset(_datafh, 0, sizeof(lfh*) * _num_jfiles);
+ for (u_int16_t i=0; i<_num_jfiles; i++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename;
- _datafh[i] = ::new lfh(ss.str(), i, NULL);
+ _datafh[i] = ::new lfh(ss.str(), i, _jfsize_sblks, NULL);
}
// TODO: Check the following comment/note (may be obsolete):
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
_rmgr.initialize(rdtoklp, rd_cb);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -145,28 +150,28 @@
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
}
- _datafh = ::new lfh*[JRNL_NUM_FILES];
+ _datafh = ::new lfh*[_num_jfiles];
// NULL the pointer array first because new() can throw exceptions
- ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ ::memset(_datafh, 0, sizeof(lfh*) * _num_jfiles);
+ for (u_int16_t i=0; i<_num_jfiles; i++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename;
- _datafh[i] = ::new lfh(ss.str(), i, &_rcvdat);
+ _datafh[i] = ::new lfh(ss.str(), i, _jfsize_sblks, &_rcvdat);
}
// TODO: Check the following comment/note (may be obsolete):
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rdtoklp, rd_cb);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -179,10 +184,10 @@
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recover_complete");
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int16_t i=0; i<_num_jfiles; i++)
_datafh[i]->reset(&_rcvdat);
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete();
_readonly_flag = false;
//std::cout << "Journal revovery complete." << std::endl;
@@ -429,7 +434,7 @@
ss << "errno=" << errno;
throw jexception(jerrno::JERR__RTCLOCK, ss.str().c_str(), "jcntl",
"write_infofile");
}
- jinf ji(_jid, _jdir.dirname(), _base_filename, ts);
+ jinf ji(_jid, _jdir.dirname(), _base_filename, _num_jfiles, _jfsize_sblks, ts);
ji.write();
}
@@ -472,7 +477,7 @@
while (rcvr_get_next_record(fid, &ifs, rd));
// Check for journal full condition
- u_int16_t next_wr_fid = (rd._lfid + 1) % JRNL_NUM_FILES;
+ u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
@@ -523,7 +528,7 @@
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
_tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- free(xidp);
+ ::free(xidp);
}
else
_emap.insert_fid(h._rid, fid);
@@ -555,7 +560,7 @@
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
_tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid,
false));
- free(xidp);
+ ::free(xidp);
}
else
{
@@ -603,7 +608,7 @@
if (itr->_enq_flag)
rd._enq_cnt_list[itr->_fid]--;
}
- free(xidp);
+ ::free(xidp);
}
break;
case RHM_JDAT_TXC_MAGIC:
@@ -633,8 +638,7 @@
rd._enq_cnt_list[fid]--;
}
}
-
- free(xidp);
+ ::free(xidp);
}
break;
case RHM_JDAT_EMPTY_MAGIC:
@@ -651,7 +655,7 @@
return false;
default:
// Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1))
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
{
rd._lfid = fid;
rd._eo = read_pos;
@@ -677,7 +681,7 @@
rd._eo = ifsp->tellg(); // remember file offset before closing
rd._lfid = fid++;
ifsp->close();
- if (fid >= JRNL_NUM_FILES)
+ if (fid >= _num_jfiles)
{
fid = 0;
rd._owi = !rd._owi; // Flip owi
@@ -720,7 +724,7 @@
{
if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite
indicator changed
{
- u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1;
+ u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
{
rd._lfid = fid;
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -131,6 +131,8 @@
bool _autostop; ///< Autostop flag - stops journal when overrun
occurs
// Journal control structures
+ const u_int16_t _num_jfiles; ///< Number of journal files
+ const u_int32_t _jfsize_sblks; ///< Journal file size in sblks
lfh** _datafh; ///< Array of pointers to data file handles
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
@@ -139,7 +141,7 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
- pthread_mutex_t _mutex; ///< Mutex for tread safety
+ pthread_mutex_t _mutex; ///< Mutex for thread safety
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///<
Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
@@ -153,8 +155,11 @@
* \param jid A unique identifier for this journal instance.
* \param jdir The directory which will contain the journal files.
* \param base_filename The string which will be used to start all journal
filenames.
+ * \param num_jfiles The number of journal files to be created.
+ * \param jfsize_sblks The size of each journal file expressed in softblocks.
*/
- jcntl(const std::string& jid, const std::string& jdir, const
std::string& base_filename);
+ jcntl(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks);
/**
* \brief Destructor.
@@ -606,9 +611,12 @@
*/
inline const std::string& base_filename() const { return _base_filename; }
+ inline const u_int16_t num_jfiles() const { return _num_jfiles; }
+ inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
protected:
/**
* \brief Check status of journal before allowing write operations.
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -135,11 +135,11 @@
while ((entry = ::readdir(dir)) != NULL)
{
// Ignore . and ..
- if (strcmp(entry->d_name, ".") != 0 &&
strcmp(entry->d_name, "..") != 0)
+ if (::strcmp(entry->d_name, ".") != 0 &&
::strcmp(entry->d_name, "..") != 0)
{
- if (strlen(entry->d_name) > base_filename.size())
+ if (::strlen(entry->d_name) > base_filename.size())
{
- if (strncmp(entry->d_name, base_filename.c_str(),
base_filename.size()) == 0)
+ if (::strncmp(entry->d_name, base_filename.c_str(),
base_filename.size()) == 0)
{
if (!found)
{
@@ -208,7 +208,7 @@
// Read jinf file, then verify all journal files are present
jinf ji(dirname + "/" + base_filename + "." +
JRNL_INFO_EXTENSION, true);
- for (u_int16_t fnum=0; fnum < ji.num_files(); fnum++)
+ for (u_int16_t fnum=0; fnum < ji.num_jfiles(); fnum++)
{
std::stringstream ss;
ss << dirname << "/" << base_filename <<
".";
@@ -334,15 +334,16 @@
while ((entry = ::readdir(dir)) != NULL)
{
// Ignore . and ..
- if (strcmp(entry->d_name, ".") != 0 &&
strcmp(entry->d_name, "..") != 0)
+ if (::strcmp(entry->d_name, ".") != 0 &&
::strcmp(entry->d_name, "..") != 0)
{
- if (strlen(entry->d_name) == base_filename.size() + 10) // Format:
basename.bak.XXXX
+ if (::strlen(entry->d_name) == base_filename.size() + 10) // Format:
basename.bak.XXXX
{
std::stringstream ss;
ss << "_" << base_filename <<
".bak.";
- if (strncmp(entry->d_name, ss.str().c_str(), base_filename.size() + 6)
== 0)
+ if (::strncmp(entry->d_name, ss.str().c_str(), base_filename.size() +
6) == 0)
{
- long this_dir_num = strtol(entry->d_name + base_filename.size() +
6, NULL, 16);
+ long this_dir_num = ::strtol(entry->d_name + base_filename.size()
+ 6,
+ NULL, 16);
if (this_dir_num > dir_num)
dir_num = this_dir_num;
}
Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -47,6 +47,15 @@
#include <string>
#include <sys/types.h>
+// Macro for formatting commom system errors
+#define FORMAT_SYSERR(errno) " errno=" << errno << " ("
<< strerror(errno) << ")"
+#define MALLOC_CHK(ptr, var, cls, fn) if (ptr == NULL) { \
+ clean(); \
+ std::stringstream ss; \
+ ss << var << ": malloc() failed: " <<
FORMAT_SYSERR(errno); \
+ throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), cls, fn); \
+ }
+
#define MAX_MSG_SIZE 1024
#define MAX_THROWING_SIZE 128
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -45,8 +45,8 @@
jinf::jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception):
_jver(0),
- _num_files(0),
- _fsize_sblks(0),
+ _num_jfiles(0),
+ _jfsize_sblks(0),
_sblk_size_dblks(0),
_dblk_size(0),
_wmgr_page_size_dblks(0),
@@ -65,14 +65,14 @@
}
jinf::jinf(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
- const timespec& ts):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const timespec&
ts):
_jver(RHM_JDAT_VERSION),
_jid(jid),
_jdir(jdir),
_base_filename(base_filename),
_ts(ts),
- _num_files(JRNL_NUM_FILES),
- _fsize_sblks(JRNL_FILE_SIZE),
+ _num_jfiles(num_jfiles),
+ _jfsize_sblks(jfsize_sblks),
_sblk_size_dblks(JRNL_SBLK_SIZE),
_dblk_size(JRNL_DBLK_SIZE),
_wmgr_page_size_dblks(JRNL_WMGR_PAGE_SIZE),
@@ -100,16 +100,16 @@
ss << "; required=" << RHM_JDAT_VERSION <<
std::endl;
err = true;
}
- if (_num_files != JRNL_NUM_FILES)
+ if (_num_jfiles < JRNL_MIN_NUM_FILES)
{
- ss << "JRNL_NUM_FILES mismatch: " << _num_files;
- ss << "; required=" << JRNL_NUM_FILES << std::endl;
+ ss << "Number of journal files too small: found=" <<
_num_jfiles;
+ ss << "; minimum=" << JRNL_MIN_NUM_FILES <<
std::endl;
err = true;
}
- if (_fsize_sblks != JRNL_FILE_SIZE)
+ if (_jfsize_sblks < JRNL_MIN_FILE_SIZE)
{
- ss << "JRNL_FILE_SIZE mismatch: " << _fsize_sblks;
- ss << "; required=" << JRNL_FILE_SIZE << std::endl;
+ ss << "Journal file size too small: found=" <<
_jfsize_sblks;
+ ss << "; minimum=" << JRNL_MIN_FILE_SIZE << "
(sblks)" << std::endl;
err = true;
}
if (_sblk_size_dblks != JRNL_SBLK_SIZE)
@@ -162,7 +162,7 @@
if (!_valid_flag)
validate();
- for (u_int16_t fnum=0; fnum < _num_files; fnum++)
+ for (u_int16_t fnum=0; fnum < _num_jfiles; fnum++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename <<
".";
@@ -235,8 +235,8 @@
ss << " Journal directory: \"" << _jdir <<
"\"" << std::endl;
ss << " Journal base filename: \"" << _base_filename
<< "\"" << std::endl;
ss << " Journal version: " << (unsigned)_jver <<
std::endl;
- ss << " Number of journal files (JRNL_NUM_FILES): " <<
_num_files << std::endl;
- ss << " Journal file size (JRNL_FILE_SIZE): " << _fsize_sblks
<< " sblks" << std::endl;
+ ss << " Number of journal files: " << _num_jfiles <<
std::endl;
+ ss << " Journal file size: " << _jfsize_sblks << "
sblks" << std::endl;
ss << " Softblock size (JRNL_SBLK_SIZE): " << _sblk_size_dblks
<< " dblks" << std::endl;
ss << " Datablock size (JRNL_DBLK_SIZE): " << _dblk_size
<< " bytes" << std::endl;
ss << " Write page size (JRNL_WMGR_PAGE_SIZE): " <<
_wmgr_page_size_dblks << " dblks" <<
@@ -273,8 +273,8 @@
ss << "\" />" << std::endl;
ss << " </creation_time>" << std::endl;
ss << " <journal_file_geometry>" << std::endl;
- ss << " <JRNL_NUM_FILES value=\"" << _num_files
<< "\" />" << std::endl;
- ss << " <JRNL_FILE_SIZE value=\"" << _fsize_sblks
<< "\" />" << std::endl;
+ ss << " <number_jrnl_files value=\"" << _num_jfiles
<< "\" />" << std::endl;
+ ss << " <jrnl_file_size_sblks value=\"" <<
_jfsize_sblks << "\" />" << std::endl;
ss << " <JRNL_SBLK_SIZE value=\"" <<
_sblk_size_dblks << "\" />" << std::endl;
ss << " <JRNL_DBLK_SIZE value=\"" << _dblk_size
<< "\" />" << std::endl;
ss << " <JRNL_WMGR_PAGE_SIZE value=\"" <<
_wmgr_page_size_dblks << "\" />" << std::endl;
@@ -307,10 +307,10 @@
string_value(_jdir, buff);
else if(::strstr(buff, "base_filename"))
string_value(_base_filename, buff);
- else if(::strstr(buff, "JRNL_NUM_FILES"))
- _num_files = u_int16_value(buff);
- else if(::strstr(buff, "JRNL_FILE_SIZE"))
- _fsize_sblks = u_int32_value(buff);
+ else if(::strstr(buff, "number_jrnl_files"))
+ _num_jfiles = u_int16_value(buff);
+ else if(::strstr(buff, "jrnl_file_size_sblks"))
+ _jfsize_sblks = u_int32_value(buff);
else if(::strstr(buff, "JRNL_SBLK_SIZE"))
_sblk_size_dblks = u_int16_value(buff);
else if(::strstr(buff, "JRNL_DBLK_SIZE"))
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -52,8 +52,8 @@
std::string _jdir;
std::string _base_filename;
timespec _ts;
- u_int16_t _num_files;
- u_int32_t _fsize_sblks;
+ u_int16_t _num_jfiles;
+ u_int32_t _jfsize_sblks;
u_int16_t _sblk_size_dblks;
u_int32_t _dblk_size;
u_int32_t _wmgr_page_size_dblks;
@@ -71,7 +71,7 @@
jinf(const std::string& jinf_filename, bool validate_flag) throw
(jexception);
// constructor for writing jinf file
jinf(const std::string& jid, const std::string& jdir, const
std::string& base_filename,
- const timespec& ts);
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const timespec&
ts);
virtual ~jinf();
void validate() throw (jexception);
@@ -83,8 +83,8 @@
inline const std::string& jdir() const { return _jdir; }
inline const std::string& base_filename() const { return _base_filename; }
inline const timespec& ts() const { return _ts; }
- inline const u_int16_t num_files() const { return _num_files; }
- inline const u_int32_t fsize_sblks() const { return _fsize_sblks; }
+ inline const u_int16_t num_jfiles() const { return _num_jfiles; }
+ inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline const u_int16_t sblk_size_dblks() const { return _sblk_size_dblks; }
inline const u_int32_t dblk_size() const { return _dblk_size; }
inline const u_int32_t wmgr_page_size_dblks() const { return
_wmgr_page_size_dblks; }
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -172,6 +172,7 @@
static void chk_hdr(const hdr& hdr) throw (jexception);
static void chk_rid(const hdr& hdr, u_int64_t rid) throw (jexception);
static void chk_tail(const rec_tail& tail, const hdr& hdr) throw
(jexception);
+ virtual void clean() = 0;
}; // class jrec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -38,13 +38,13 @@
namespace journal
{
-lfh::lfh():
- nlfh()
+lfh::lfh(const u_int32_t jfsize_sblks):
+ nlfh(jfsize_sblks)
{}
-lfh::lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const ro)
- throw (jexception*):
- nlfh(fbasename, fid, ro)
+lfh::lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t
jfsize_sblks,
+ rcvdat const * const ro) throw (jexception*):
+ nlfh(fbasename, fid, jfsize_sblks, ro)
{}
lfh::~lfh()
Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -55,9 +55,9 @@
class lfh : public nlfh
{
public:
- lfh();
- lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const
ro)
- throw (jexception*);
+ lfh(const u_int32_t jfsize_sblks);
+ lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t
jfsize_sblks,
+ rcvdat const * const ro) throw (jexception*);
virtual ~lfh();
};
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -45,52 +45,54 @@
namespace journal
{
-nlfh::nlfh():
+nlfh::nlfh(const u_int32_t jfsize_sblks):
_fname(),
_fid(0),
+ _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
_rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1))
+ _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0)
#endif
{}
-nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const
ro)
- throw (jexception):
+nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t
jfsize_sblks,
+ const rcvdat* const ro) throw (jexception):
_fname(),
_fid(fid),
+ _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
_rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1))
+ _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0)
#endif
{
- initialize(fbasename, fid, ro);
- open();
+ initialize(fbasename, fid, jfsize_sblks, ro);
+ open_fh();
}
nlfh::~nlfh()
{
- close();
+ close_fh();
}
void
nlfh::initialize(const std::string& fbasename, const u_int16_t fid,
- const rcvdat* const ro) throw (jexception)
+ const u_int32_t jfsize_sblks, const rcvdat* const ro) throw (jexception)
{
std::stringstream ss;
@@ -122,8 +124,8 @@
}
else
{
- _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
- _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_subm_cnt_dblks = _ffull_dblks;
+ _wr_cmpl_cnt_dblks = _ffull_dblks;
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
}
@@ -134,10 +136,10 @@
const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
// NOTE: The journal file size is always one sblock bigger than the specified
journal
- // file size (JRNL_FILE_SIZE in jcfg.hpp), which is the data content size.
The extra
+ // file size, which is the data content size. The extra
// block is for the journal file header which precedes all data on each file
and is
// exactly one softblock in size.
- u_int32_t nsblks = JRNL_FILE_SIZE + 1;
+ u_int32_t nsblks = jfsize_sblks + 1;
void* nullbuf = NULL;
if (::posix_memalign(&nullbuf, sblksize, writesize))
{
@@ -204,8 +206,8 @@
}
else
{
- _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
- _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_subm_cnt_dblks = _ffull_dblks;
+ _wr_cmpl_cnt_dblks = _ffull_dblks;
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
return true;
@@ -287,11 +289,11 @@
const u_int32_t
nlfh::add_wr_subm_cnt_dblks(u_int32_t a) throw (jexception)
{
- if (_wr_subm_cnt_dblks + a > JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) // Allow for
file header
+ if (_wr_subm_cnt_dblks + a > _ffull_dblks) // Allow for file header
{
std::stringstream ss;
ss << "_wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks <<
" incr=" << a;
- ss << " fsize=" << JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)
<< " dblks";
+ ss << " fsize=" << _ffull_dblks << "
dblks";
throw jexception(jerrno::JERR_NLFH_FILEOFFSOVFL, ss.str().c_str(),
"nlfh",
"add_wr_subm_cnt_dblks");
}
@@ -328,7 +330,7 @@
// Private functions
void
-nlfh::open() throw (jexception)
+nlfh::open_fh() throw (jexception)
{
_rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
if (_rd_fh < 0)
@@ -348,7 +350,7 @@
}
void
-nlfh::close()
+nlfh::close_fh()
{
if (_rd_fh >= 0)
{
@@ -362,9 +364,5 @@
}
}
-// static const definition
-
-const u_int32_t nlfh::_fsize_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -58,9 +58,9 @@
class nlfh
{
protected:
- static const u_int32_t _fsize_dblks; ///< File size in dblks
std::string _fname; ///< File name
u_int16_t _fid; ///< File ID (ordinal number in ring buffer)
+ const u_int32_t _ffull_dblks; ///< File size in dblks (incl. file header)
int _rd_fh; ///< Read file handle
int _wr_fh; ///< Write file handle
u_int32_t _rec_enqcnt; ///< Count of enqueued records
@@ -70,14 +70,14 @@
u_int32_t _wr_cmpl_cnt_dblks; ///< Write file count (data blocks) for
completed AIO
public:
- nlfh();
+ nlfh(const u_int32_t jfsize_sblks);
// Constructors with implicit initialize() and open()
- nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const
ro)
- throw (jexception);
+ nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t
jfsize_sblks,
+ const rcvdat* const ro) throw (jexception);
virtual ~nlfh();
virtual void initialize(const std::string& fbasename, const u_int16_t fid,
- const rcvdat* const ro) throw (jexception);
+ const u_int32_t jfsize_sblks, const rcvdat* const ro) throw
(jexception);
virtual bool reset(const rcvdat* const ro = NULL);
inline const std::string& fname() const { return _fname; }
@@ -120,9 +120,9 @@
inline const bool wr_empty() const { return _wr_subm_cnt_dblks == 0; }
inline const u_int32_t wr_remaining_dblks() const
- { return _fsize_dblks - _wr_subm_cnt_dblks; }
- inline const bool is_wr_full() const { return _fsize_dblks == _wr_subm_cnt_dblks;
}
- inline const bool is_wr_compl() const { return _fsize_dblks ==
_wr_cmpl_cnt_dblks; }
+ { return _ffull_dblks - _wr_subm_cnt_dblks; }
+ inline const bool is_wr_full() const { return _ffull_dblks == _wr_subm_cnt_dblks;
}
+ inline const bool is_wr_compl() const { return _ffull_dblks ==
_wr_cmpl_cnt_dblks; }
inline const u_int32_t wr_aio_outstanding_dblks() const
{ return _wr_subm_cnt_dblks - _wr_cmpl_cnt_dblks; }
inline const bool wr_file_rotate() const { return is_wr_full(); }
@@ -131,8 +131,8 @@
const std::string& status_str(std::string& s) const;
protected:
- virtual void open() throw (jexception);
- virtual void close();
+ virtual void open_fh() throw (jexception);
+ virtual void close_fh();
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -151,31 +151,16 @@
}
// 2. Allocate array of page pointers
_page_ptr_arr = (void**)::malloc(_pages * sizeof(void*));
- if (_page_ptr_arr == NULL)
- {
- clean();
- ss << "_page_ptr_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr",
"initialize");
- }
+ MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr",
"initialize");
// 3. Allocate and initilaize page control block (page_cb) array
_page_cb_arr = (page_cb*)::malloc(_pages * sizeof(page_cb));
- if (_page_cb_arr == NULL)
- {
- clean();
- ss << "_page_cb_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr",
"initialize");
- }
+ MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr",
"initialize");
::memset(_page_cb_arr, 0, _pages * sizeof(page_cb));
// 5. Allocate IO control block (iocb) array
_iocb_arr = (iocb*)::malloc(_pages * sizeof(iocb));
- if (_iocb_arr == NULL)
- {
- clean();
- ss << "_iocb_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr",
"initialize");
- }
+ MALLOC_CHK(_iocb_arr, "_iocb_arr", "pmgr",
"initialize");
// 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page
block
for (u_int16_t i=0; i<_pages; i++)
@@ -192,12 +177,7 @@
// 7. Allocate io_event array, max one event per cache page in rmgr and wmgr
const u_int16_t max_aio_evts = JRNL_RMGR_PAGES + JRNL_WMGR_PAGES;
_ioevt_arr = (io_event*)::malloc(max_aio_evts * sizeof(io_event));
- if (_ioevt_arr == NULL)
- {
- clean();
- ss << "_ioevt_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr",
"initialize");
- }
+ MALLOC_CHK(_ioevt_arr, "_ioevt_arr", "pmgr",
"initialize");
// 8. Initialize AIO context
if (int ret = ::io_queue_init(max_aio_evts, &_ioctx))
@@ -207,23 +187,10 @@
}
}
-// bool
-// pmgr::rotate_page(page_state state)
-// {
-// _page_cb_arr[_pg_index]._state = state;
-// if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
-// {
-// _pg_offset_dblks = 0;
-// _pg_cntr++;
-// }
-// if (++_pg_index >= _pages)
-// _pg_index = 0;
-// return false;
-// }
-
void
pmgr::clean()
{
+ // clean up allocated memory here
if (_ioctx)
::io_queue_release(_ioctx);
if (_page_base_ptr)
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -52,7 +52,7 @@
bool _full; ///< Journal is full
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records
found for each file
- rcvdat():
+ rcvdat(u_int16_t num_jfiles):
_owi(false),
_empty(true),
_ffid(0),
@@ -61,7 +61,7 @@
_eo(0),
_h_rid(0),
_full(false),
- _enq_cnt_list(JRNL_NUM_FILES, 0)
+ _enq_cnt_list(num_jfiles, 0)
{}
void reset()
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -766,7 +766,7 @@
// This counter is for bookkeeping only, page rotates are handled directly in
init_aio_reads()
// FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is
now...
// Need to move reset into if (_rrfc.file_rotate()) above.
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_RMGR_PAGE_SIZE))
+ if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
_pg_cntr = 0;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -58,7 +58,7 @@
class rmgr : public pmgr
{
private:
- rrfc& _rrfc; ///< Ref to write rotating file controller
+ rrfc& _rrfc; ///< Ref to read rotating file controller
hdr _hdr; ///< Header used to determind record type
public:
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -61,7 +61,9 @@
{}
txn_rec::~txn_rec()
-{}
+{
+ clean();
+}
void
txn_rec::reset(const u_int32_t magic)
@@ -270,12 +272,7 @@
rd_cnt = _txn_hdr.size();
chk_hdr();
_buff = ::malloc(_txn_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "txn_rec",
"decode");
- }
+ MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
const u_int32_t hdr_xid_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize);
const u_int32_t hdr_xid_tail_dblks = size_dblks(txn_hdr::size() +
_txn_hdr._xidsize +
rec_tail::size());
@@ -339,12 +336,7 @@
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
_buff = ::malloc(_txn_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "deq_rec",
"rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "txn_rec",
"rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _txn_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -427,5 +419,11 @@
jrec::chk_tail(_txn_tail, _txn_hdr._hdr);
}
+void
+txn_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -90,6 +90,7 @@
void chk_hdr() const throw (jexception);
void chk_hdr(u_int64_t rid) const throw (jexception);
void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class txn_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -35,6 +35,7 @@
#include <assert.h>
#include <cerrno>
#include <sstream>
+#include <jrnl/jcntl.hpp>
#include <jrnl/jerrno.hpp>
namespace rhm
@@ -51,6 +52,8 @@
_fhdr_ptr_arr(NULL),
_iocba(NULL),
_cached_offset_dblks(0),
+ _jfsize_dblks(_jc->jfsize_sblks() * JRNL_SBLK_SIZE),
+ _jfsize_pgs(_jc->jfsize_sblks() / JRNL_WMGR_PAGE_SIZE),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -68,6 +71,8 @@
_fhdr_ptr_arr(NULL),
_iocba(NULL),
_cached_offset_dblks(0),
+ _jfsize_dblks(_jc->jfsize_sblks() * JRNL_SBLK_SIZE),
+ _jfsize_pgs(_jc->jfsize_sblks() / JRNL_WMGR_PAGE_SIZE),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -182,8 +187,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _enq_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -209,7 +214,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -332,8 +337,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _deq_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -359,7 +364,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -478,8 +483,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -505,7 +510,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -620,8 +625,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -647,7 +652,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -676,7 +681,7 @@
wmgr::flush()
{
iores res = write_flush();
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -886,8 +891,9 @@
void
wmgr::initialize() throw (jexception)
{
+ const u_int16_t num_jfiles = _jc->num_jfiles();
pmgr::initialize();
- if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * JRNL_NUM_FILES))
+ if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * num_jfiles))
{
clean();
std::stringstream ss;
@@ -896,24 +902,12 @@
throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr",
"initialize");
}
_fhdr_ptr_arr = (void**)::malloc(_pages * sizeof(void*));
- if (_fhdr_ptr_arr == NULL)
+ MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr",
"initialize");
+ _iocba = (iocb**)::malloc(sizeof(iocb*) * num_jfiles);
+ MALLOC_CHK(_iocba, "_iocba", "wmgr", "initialize");
+ ::memset(_iocba, 0, sizeof(iocb*) * num_jfiles);
+ for (u_int16_t i=0; i<num_jfiles; i++)
{
- clean();
- std::stringstream ss;
- ss << "_fhdr_ptr_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr",
"initialize");
- }
- _iocba = (iocb**)::malloc(sizeof(iocb*) * JRNL_NUM_FILES);
- if (_iocba == NULL)
- {
- clean();
- std::stringstream ss;
- ss << "_iocba malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr",
"initialize");
- }
- ::memset(_iocba, 0, sizeof(iocb*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
- {
_fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
_iocba[i] = ::new iocb;
}
@@ -1044,6 +1038,7 @@
void
wmgr::clean()
{
+ // clean up allocated memory here
if (_fhdr_base_ptr)
{
::free(_fhdr_base_ptr);
@@ -1056,7 +1051,7 @@
}
if (_iocba)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_jc->num_jfiles(); i++)
if (_iocba[i])
::delete _iocba[i];
::free(_iocba);
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -77,6 +77,8 @@
iocb** _iocba; ///< Array of iocb pointers for file header
writes
u_int32_t _cached_offset_dblks; ///< Amount of unwritten data in page
(dblocks)
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
+ const u_int32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
+ const u_int32_t _jfsize_pgs; ///< Journal file size in cache pages
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -39,6 +39,8 @@
#define AIO_SLEEP_TIME 1000
#define MSG_SIZE 100
#define XID_SIZE 64
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 96
using namespace std;
@@ -48,7 +50,7 @@
try
{
char* test_name = "InstantiationTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
}
catch (const rhm::journal::jexception& e)
{
@@ -64,7 +66,7 @@
try
{
char* test_name = "InitializationTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
}
catch (const rhm::journal::jexception& e)
@@ -84,15 +86,15 @@
u_int64_t highest_rid;
char* test_name = "EmptyRecoverTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
jc.recover_complete();
}
@@ -111,7 +113,7 @@
try
{
char* test_name = "EnqueueTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
// Non-txn
@@ -142,13 +144,13 @@
// Non-txn
char* test_name = "RecoverReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -162,7 +164,7 @@
// Txn
test_name = "TxnRecoverReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 1, XID_SIZE);
txn_list.push_back(xid);
@@ -171,7 +173,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -200,13 +202,13 @@
// Non-txn
char* test_name = "RecoveredReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -228,7 +230,7 @@
// Txn
test_name = "TxnRecoveredReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 2, XID_SIZE);
txn_list.push_back(xid);
@@ -237,7 +239,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -274,13 +276,13 @@
// Non-txn
char* test_name = "RecoveredDequeueTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -304,7 +306,7 @@
// Txn
test_name = "TxnRecoveredDequeueTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 3, XID_SIZE);
txn_list.push_back(xid);
@@ -313,7 +315,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -352,7 +354,7 @@
// Non-txn
char* test_name = "FlagsRecoverdTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
// Transient msgs - should not recover
for (int m=0; m<NUM_MSGS; m++)
@@ -368,7 +370,7 @@
enq_extern_msg(&jc, false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -421,7 +423,7 @@
// Txn
test_name = "TxnFlagsRecoverdTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 4, XID_SIZE);
txn_list.push_back(xid);
@@ -440,7 +442,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -508,7 +510,7 @@
// Non-txn
char* test_name = "ComplexRecoveryTest1";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
@@ -528,7 +530,7 @@
}
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
@@ -565,7 +567,7 @@
// Txn
test_name = "TxnComplexRecoveryTest1";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.initialize();
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
@@ -592,7 +594,7 @@
}
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES,
JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
Modified: store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -38,13 +38,16 @@
#include <iomanip>
#include <iostream>
#include <jrnl/file_hdr.hpp>
+#include <jrnl/jcfg.hpp>
#include <jrnl/jdir.hpp>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
#include <sys/stat.h>
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 128
+
#define ERRORSTR(e) ::strerror(ret) << " (" << ret <<
")"
-#define NUM_JDAT_FILES 20
#define NUM_CLEAR_OPS 20
using namespace boost::unit_test;
@@ -281,7 +284,7 @@
void create_jrnl_fileset(const char* dirname, const char* base_filename)
{
create_jinf_file(dirname, base_filename);
- for (u_int32_t fid = 0; fid < NUM_JDAT_FILES; fid++)
+ for (u_int32_t fid = 0; fid < NUM_JFILES; fid++)
{
u_int64_t rid = 0x12340000 + (fid * 0x25);
create_jdat_file(dirname, base_filename, fid, rid);
@@ -293,7 +296,7 @@
{
std::stringstream fn;
fn << dirname << "/" << base_filename <<
".";
- fn << std::setfill('0') << std::setw(4) << std::hex
<< fid << ".jdat";
+ fn << std::setfill('0') << std::hex << std::setw(4)
<< fid << ".jdat";
file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, 0, first_rid, fid, 0x200, true);
std::ofstream of(fn.str().c_str(), std::ofstream::out | std::ofstream::trunc);
if (!of.good())
@@ -306,7 +309,7 @@
{
timespec ts;
::clock_gettime(CLOCK_REALTIME, &ts);
- jinf ji("test journal id", dirname, base_filename, ts);
+ jinf ji("test journal id", dirname, base_filename, NUM_JFILES,
JFSIZE_SBLKS, ts);
ji.write();
}
@@ -319,7 +322,7 @@
BOOST_CHECK_EQUAL(count_dir_contents(dirname, false, true), num_subdirs);
// Journal file count
- unsigned num_jrnl_files = jrnl_present ? NUM_JDAT_FILES + 1 : 0;
+ unsigned num_jrnl_files = jrnl_present ? NUM_JFILES + 1 : 0;
BOOST_CHECK_EQUAL(count_dir_contents(dirname, true, false), num_jrnl_files);
// Check journal files are present
Modified: store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -40,6 +40,9 @@
#include <jrnl/jexception.hpp>
#include <jrnl/jinf.hpp>
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 128
+
using namespace boost::unit_test;
using namespace rhm::journal;
@@ -66,7 +69,7 @@
void test_write_constructor()
{
::clock_gettime(CLOCK_REALTIME, &ts);
- jinf ji(jid, jdir, base_filename, ts);
+ jinf ji(jid, jdir, base_filename, NUM_JFILES, JFSIZE_SBLKS, ts);
BOOST_CHECK_EQUAL(ji.jver(), RHM_JDAT_VERSION);
BOOST_CHECK(ji.jid().compare(jid) == 0);
BOOST_CHECK(ji.jdir().compare(jdir) == 0);
@@ -74,8 +77,8 @@
timespec this_ts = ji.ts();
BOOST_CHECK_EQUAL(this_ts.tv_sec, ts.tv_sec);
BOOST_CHECK_EQUAL(this_ts.tv_nsec, ts.tv_nsec);
- BOOST_CHECK_EQUAL(ji.num_files(), (u_int16_t)JRNL_NUM_FILES);
- BOOST_CHECK_EQUAL(ji.fsize_sblks(), (u_int32_t)JRNL_FILE_SIZE);
+ BOOST_CHECK_EQUAL(ji.num_jfiles(), (u_int16_t)NUM_JFILES);
+ BOOST_CHECK_EQUAL(ji.jfsize_sblks(), (u_int32_t)JFSIZE_SBLKS);
BOOST_CHECK_EQUAL(ji.sblk_size_dblks(), (u_int16_t)JRNL_SBLK_SIZE);
BOOST_CHECK_EQUAL(ji.dblk_size(), (u_int32_t)JRNL_DBLK_SIZE);
BOOST_CHECK_EQUAL(ji.wmgr_page_size_dblks(), (u_int32_t)JRNL_WMGR_PAGE_SIZE);
@@ -97,8 +100,8 @@
const timespec this_ts = ji.ts();
BOOST_CHECK_EQUAL(this_ts.tv_sec, ts.tv_sec);
BOOST_CHECK_EQUAL(this_ts.tv_nsec, ts.tv_nsec);
- BOOST_CHECK_EQUAL(ji.num_files(), (u_int16_t)JRNL_NUM_FILES);
- BOOST_CHECK_EQUAL(ji.fsize_sblks(), (u_int32_t)JRNL_FILE_SIZE);
+ BOOST_CHECK_EQUAL(ji.num_jfiles(), (u_int16_t)NUM_JFILES);
+ BOOST_CHECK_EQUAL(ji.jfsize_sblks(), (u_int32_t)JFSIZE_SBLKS);
BOOST_CHECK_EQUAL(ji.sblk_size_dblks(), (u_int16_t)JRNL_SBLK_SIZE);
BOOST_CHECK_EQUAL(ji.dblk_size(), (u_int32_t)JRNL_DBLK_SIZE);
BOOST_CHECK_EQUAL(ji.wmgr_page_size_dblks(), (u_int32_t)JRNL_WMGR_PAGE_SIZE);
@@ -138,7 +141,7 @@
void test_analyze_linear_journal()
{
std::vector<std::string> jfiles;
- for (int i=0; i<JRNL_NUM_FILES; i++)
+ for (int i=0; i<NUM_JFILES; i++)
{
create_journal_filenames(jfiles);
@@ -160,11 +163,11 @@
void create_journal_filenames(std::vector<std::string>& jfiles)
{
- for (int fnum=0; fnum<JRNL_NUM_FILES; fnum++)
+ for (int fnum=0; fnum<NUM_JFILES; fnum++)
{
std::stringstream fn;
fn << jdir << "/" << base_filename <<
".";
- fn << std::setfill('0') << std::setw(4) << fnum
<< "." << JRNL_DATA_EXTENSION;
+ fn << std::setfill('0') << std::hex << std::setw(4)
<< fnum << "." << JRNL_DATA_EXTENSION;
jfiles.push_back(fn.str());
}
}
@@ -176,7 +179,7 @@
file_hdr fh;
std::vector<std::string>::iterator itr;
u_int32_t fid = 0;
- u_int64_t rid = rid_offs + ((JRNL_NUM_FILES - min_fid_offs) % JRNL_NUM_FILES) *
rid_incr;
+ u_int64_t rid = rid_offs + ((NUM_JFILES - min_fid_offs) % NUM_JFILES) * rid_incr;
for (itr=jfiles.begin(); itr<jfiles.end(); itr++)
{
@@ -200,7 +203,7 @@
of.close();
if (++fid == min_fid_offs)
- rid -= rid_incr * (JRNL_NUM_FILES - 1);
+ rid -= rid_incr * (NUM_JFILES - 1);
else
rid += rid_incr;
}