rhmessaging commits: r1407 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-30 17:57:44 -0500 (Fri, 30 Nov 2007)
New Revision: 1407
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcfg.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jexception.hpp
store/trunk/cpp/lib/jrnl/jinf.cpp
store/trunk/cpp/lib/jrnl/jinf.hpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/lfh.cpp
store/trunk/cpp/lib/jrnl/lfh.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/rcvdat.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
Log:
The two #defines from jcfg.hpp JRNL_FILE_SIZE (the journal file size in sblks) and JRNL_NUM_FILES are now parameterized as far as BdbMessageStore where they are defined as consts. Hardwired mimima apply to both. Also some minor tidy-up around error handling for ::malloc().
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -60,6 +60,8 @@
enqueueXidDb(&env, 0),
dequeueXidDb(&env, 0),
prepareXidDb(&env, 0),
+ numJrnlFiles(8), // TODO - make param
+ jrnlFsizeSblks(3072), // TODO - make param
isInit(false),
envPath(envpath)
@@ -202,7 +204,7 @@
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
- JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
@@ -381,7 +383,7 @@
if (usingJrnl())
{
const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), numJrnlFiles, jrnlFsizeSblks, defJournalGetEventsTimeout, defJournalFlushTimeout);
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try
@@ -982,8 +984,7 @@
usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
break;
case rhm::journal::RHM_IORES_FULL:
-// Temporary error msg till exception handler core problem solved...
-std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"" << std::endl << std::flush;
+ std::cerr << "Error storing message -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
THROW_STORE_FULL_EXCEPTION("Error storing message -- Journal full :" + queue->getName());
break;
default:
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-30 22:57:44 UTC (rev 1407)
@@ -76,6 +76,8 @@
IdSequence messageIdSequence;
static bool useAsync;
std::string storeDir;
+ const u_int16_t numJrnlFiles;
+ const u_int32_t jrnlFsizeSblks;
bool isInit;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -37,9 +37,11 @@
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
+ const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout):
- jcntl(journalId, journalDirectory, journalBaseFilename),
+ jcntl(journalId, journalDirectory, journalBaseFilename, num_jfiles, jfsize_sblks),
getEventsTimerSetFlag(false),
writeActivityFlag(false),
flushTriggeredFlag(true),
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-30 22:57:44 UTC (rev 1407)
@@ -85,6 +85,8 @@
JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
+ const u_int16_t num_jfiles,
+ const u_int32_t jfsize_sblks,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout);
virtual ~JournalImpl();
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -59,7 +59,9 @@
{}
deq_rec::~deq_rec()
-{}
+{
+ clean();
+}
void
deq_rec::reset()
@@ -274,12 +276,7 @@
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "deq_rec", "decode");
- }
+ MALLOC_CHK(_buff, "_buff", "deq_rec", "decode");
const u_int32_t hdr_xid_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize);
const u_int32_t hdr_xid_tail_dblks = size_dblks(deq_hdr::size() + _deq_hdr._xidsize +
rec_tail::size());
@@ -347,12 +344,7 @@
if (_deq_hdr._xidsize)
{
_buff = ::malloc(_deq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "deq_rec", "rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "deq_rec", "rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _deq_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -434,5 +426,11 @@
jrec::chk_tail(_deq_tail, _deq_hdr._hdr);
}
+void
+deq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -92,6 +92,7 @@
virtual void chk_hdr() const throw (jexception);
virtual void chk_hdr(u_int64_t rid) const throw (jexception);
virtual void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class deq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -65,7 +65,9 @@
{}
enq_rec::~enq_rec()
-{}
+{
+ clean();
+}
// Prepare instance for use in reading data from journal, where buf contains preallocated space
// to receive data.
@@ -354,12 +356,7 @@
if (_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize))
{
_buff = ::malloc(_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize));
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "enq_rec", "decode");
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
const u_int32_t hdr_xid_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize);
const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize +
@@ -468,12 +465,7 @@
if (_enq_hdr._xidsize)
{
_buff = ::malloc(_enq_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "enq_rec", "rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _enq_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -585,5 +577,11 @@
jrec::chk_tail(_enq_tail, _enq_hdr._hdr);
}
+void
+enq_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -107,6 +107,7 @@
void chk_hdr() const throw (jexception);
void chk_hdr(u_int64_t rid) const throw (jexception);
void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class enq_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/jcfg.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcfg.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -62,8 +62,11 @@
*/
#define JRNL_DBLK_SIZE 128 ///< Data block size in bytes (CANNOT BE LESS THAN 32!)
#define JRNL_SBLK_SIZE 4 ///< Disk softblock size in multiples of JRNL_DBLK_SIZE
-#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks excl. file_hdr
-#define JRNL_NUM_FILES 8 ///< Number of journal files
+#define JRNL_MIN_FILE_SIZE 128 ///< Min. jrnl file size in sblks (excl. file_hdr)
+#define JRNL_MIN_NUM_FILES 4 ///< Min. number of journal files
+// The following are now parameterized in the jcntl constructor, and no longer needed here.
+//#define JRNL_FILE_SIZE 3072 ///< Journal file size in softblocks excl. file_hdr
+//#define JRNL_NUM_FILES 8 ///< Number of journal files
#define JRNL_RMGR_PAGE_SIZE 128 ///< Journal page size in softblocks
#define JRNL_RMGR_PAGES 16 ///< Number of pages to use in wmgr
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -41,6 +41,7 @@
#include <jrnl/jinf.hpp>
#include <sstream>
#include <qpid/broker/PersistableMessage.h>
+#include <unistd.h>
namespace rhm
{
@@ -49,7 +50,8 @@
// Functions
-jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename):
+jcntl::jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks):
_jid(jid),
_jdir(jdir, base_filename),
_base_filename(base_filename),
@@ -57,13 +59,16 @@
_stop_flag(false),
_readonly_flag(false),
_autostop(true),
+ _num_jfiles(num_jfiles > JRNL_MIN_NUM_FILES ? num_jfiles : JRNL_MIN_NUM_FILES),
+ _jfsize_sblks(jfsize_sblks > JRNL_MIN_FILE_SIZE ? jfsize_sblks :JRNL_MIN_FILE_SIZE),
_datafh(NULL),
_emap(),
_tmap(),
_rrfc(),
_wrfc(),
_rmgr(this, _emap, _tmap, _rrfc),
- _wmgr(this, _emap, _tmap, _wrfc)
+ _wmgr(this, _emap, _tmap, _wrfc),
+ _rcvdat(_num_jfiles)
{
pthread_mutex_init(&_mutex, NULL);
}
@@ -75,7 +80,7 @@
catch (const jexception& e) { std::cerr << e << std::endl; }
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
@@ -94,28 +99,28 @@
// TODO - place this in a finalize() fn? - see ~jcntl()...
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
}
- _datafh = ::new lfh*[JRNL_NUM_FILES];
+ _datafh = ::new lfh*[_num_jfiles];
// NULL the pointer array first because new() can throw exceptions
- ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ ::memset(_datafh, 0, sizeof(lfh*) * _num_jfiles);
+ for (u_int16_t i=0; i<_num_jfiles; i++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename;
- _datafh[i] = ::new lfh(ss.str(), i, NULL);
+ _datafh[i] = ::new lfh(ss.str(), i, _jfsize_sblks, NULL);
}
// TODO: Check the following comment/note (may be obsolete):
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh);
_rmgr.initialize(rdtoklp, rd_cb);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -145,28 +150,28 @@
if (_datafh)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_num_jfiles; i++)
if (_datafh[i])
::delete _datafh[i];
::delete[] _datafh;
}
- _datafh = ::new lfh*[JRNL_NUM_FILES];
+ _datafh = ::new lfh*[_num_jfiles];
// NULL the pointer array first because new() can throw exceptions
- ::memset(_datafh, 0, sizeof(lfh*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ ::memset(_datafh, 0, sizeof(lfh*) * _num_jfiles);
+ for (u_int16_t i=0; i<_num_jfiles; i++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename;
- _datafh[i] = ::new lfh(ss.str(), i, &_rcvdat);
+ _datafh[i] = ::new lfh(ss.str(), i, _jfsize_sblks, &_rcvdat);
}
// TODO: Check the following comment/note (may be obsolete):
// NOTE: The write RFC must initialize first. This sets all the file handle object
// (lfh) counters and pointers for both read and write, since write activity
// constrains read activity (i.e. one can't read what has not yet been written).
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.initialize(rdtoklp, rd_cb);
_wmgr.initialize(wdtoklp, wr_cb, JRNL_WMGR_MAXDTOKPP, JRNL_WMGR_MAXWAITUS);
@@ -179,10 +184,10 @@
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int16_t i=0; i<_num_jfiles; i++)
_datafh[i]->reset(&_rcvdat);
- _wrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, &_rcvdat);
- _rrfc.initialize(JRNL_NUM_FILES, (nlfh**)_datafh, _rcvdat._ffid);
+ _wrfc.initialize(_num_jfiles, (nlfh**)_datafh, &_rcvdat);
+ _rrfc.initialize(_num_jfiles, (nlfh**)_datafh, _rcvdat._ffid);
_rmgr.recover_complete();
_readonly_flag = false;
//std::cout << "Journal revovery complete." << std::endl;
@@ -429,7 +434,7 @@
ss << "errno=" << errno;
throw jexception(jerrno::JERR__RTCLOCK, ss.str().c_str(), "jcntl", "write_infofile");
}
- jinf ji(_jid, _jdir.dirname(), _base_filename, ts);
+ jinf ji(_jid, _jdir.dirname(), _base_filename, _num_jfiles, _jfsize_sblks, ts);
ji.write();
}
@@ -472,7 +477,7 @@
while (rcvr_get_next_record(fid, &ifs, rd));
// Check for journal full condition
- u_int16_t next_wr_fid = (rd._lfid + 1) % JRNL_NUM_FILES;
+ u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
rd._full = true;
@@ -523,7 +528,7 @@
assert(xidp != NULL);
std::string xid((char*)xidp, er.xid_size());
_tmap.insert_txn_data(xid, txn_data(h._rid, 0, fid, true));
- free(xidp);
+ ::free(xidp);
}
else
_emap.insert_fid(h._rid, fid);
@@ -555,7 +560,7 @@
assert(xidp != NULL);
std::string xid((char*)xidp, dr.xid_size());
_tmap.insert_txn_data(xid, txn_data(dr.rid(), dr.deq_rid(), fid, false));
- free(xidp);
+ ::free(xidp);
}
else
{
@@ -603,7 +608,7 @@
if (itr->_enq_flag)
rd._enq_cnt_list[itr->_fid]--;
}
- free(xidp);
+ ::free(xidp);
}
break;
case RHM_JDAT_TXC_MAGIC:
@@ -633,8 +638,7 @@
rd._enq_cnt_list[fid]--;
}
}
-
- free(xidp);
+ ::free(xidp);
}
break;
case RHM_JDAT_EMPTY_MAGIC:
@@ -651,7 +655,7 @@
return false;
default:
// Is this the last file, if so, stop as this is the overwrite boundary.
- if (fid == (rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1))
+ if (fid == (rd._ffid ? rd._ffid - 1 : _num_jfiles - 1))
{
rd._lfid = fid;
rd._eo = read_pos;
@@ -677,7 +681,7 @@
rd._eo = ifsp->tellg(); // remember file offset before closing
rd._lfid = fid++;
ifsp->close();
- if (fid >= JRNL_NUM_FILES)
+ if (fid >= _num_jfiles)
{
fid = 0;
rd._owi = !rd._owi; // Flip owi
@@ -720,7 +724,7 @@
{
if (rd._ffid ? h.get_owi() == rd._owi : h.get_owi() != rd._owi) // Overwrite indicator changed
{
- u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : JRNL_NUM_FILES - 1;
+ u_int16_t expected_fid = rd._ffid ? rd._ffid - 1 : _num_jfiles - 1;
if (fid == expected_fid)
{
rd._lfid = fid;
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -131,6 +131,8 @@
bool _autostop; ///< Autostop flag - stops journal when overrun occurs
// Journal control structures
+ const u_int16_t _num_jfiles; ///< Number of journal files
+ const u_int32_t _jfsize_sblks; ///< Journal file size in sblks
lfh** _datafh; ///< Array of pointers to data file handles
enq_map _emap; ///< Enqueue map for low water mark management
txn_map _tmap; ///< Transaction map open transactions
@@ -139,7 +141,7 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
- pthread_mutex_t _mutex; ///< Mutex for tread safety
+ pthread_mutex_t _mutex; ///< Mutex for thread safety
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
@@ -153,8 +155,11 @@
* \param jid A unique identifier for this journal instance.
* \param jdir The directory which will contain the journal files.
* \param base_filename The string which will be used to start all journal filenames.
+ * \param num_jfiles The number of journal files to be created.
+ * \param jfsize_sblks The size of each journal file expressed in softblocks.
*/
- jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename);
+ jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename,
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks);
/**
* \brief Destructor.
@@ -606,9 +611,12 @@
*/
inline const std::string& base_filename() const { return _base_filename; }
+ inline const u_int16_t num_jfiles() const { return _num_jfiles; }
+ inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
+
protected:
/**
* \brief Check status of journal before allowing write operations.
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -135,11 +135,11 @@
while ((entry = ::readdir(dir)) != NULL)
{
// Ignore . and ..
- if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
+ if (::strcmp(entry->d_name, ".") != 0 && ::strcmp(entry->d_name, "..") != 0)
{
- if (strlen(entry->d_name) > base_filename.size())
+ if (::strlen(entry->d_name) > base_filename.size())
{
- if (strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0)
+ if (::strncmp(entry->d_name, base_filename.c_str(), base_filename.size()) == 0)
{
if (!found)
{
@@ -208,7 +208,7 @@
// Read jinf file, then verify all journal files are present
jinf ji(dirname + "/" + base_filename + "." + JRNL_INFO_EXTENSION, true);
- for (u_int16_t fnum=0; fnum < ji.num_files(); fnum++)
+ for (u_int16_t fnum=0; fnum < ji.num_jfiles(); fnum++)
{
std::stringstream ss;
ss << dirname << "/" << base_filename << ".";
@@ -334,15 +334,16 @@
while ((entry = ::readdir(dir)) != NULL)
{
// Ignore . and ..
- if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
+ if (::strcmp(entry->d_name, ".") != 0 && ::strcmp(entry->d_name, "..") != 0)
{
- if (strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX
+ if (::strlen(entry->d_name) == base_filename.size() + 10) // Format: basename.bak.XXXX
{
std::stringstream ss;
ss << "_" << base_filename << ".bak.";
- if (strncmp(entry->d_name, ss.str().c_str(), base_filename.size() + 6) == 0)
+ if (::strncmp(entry->d_name, ss.str().c_str(), base_filename.size() + 6) == 0)
{
- long this_dir_num = strtol(entry->d_name + base_filename.size() + 6, NULL, 16);
+ long this_dir_num = ::strtol(entry->d_name + base_filename.size() + 6,
+ NULL, 16);
if (this_dir_num > dir_num)
dir_num = this_dir_num;
}
Modified: store/trunk/cpp/lib/jrnl/jexception.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jexception.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jexception.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -47,6 +47,15 @@
#include <string>
#include <sys/types.h>
+// Macro for formatting commom system errors
+#define FORMAT_SYSERR(errno) " errno=" << errno << " (" << strerror(errno) << ")"
+#define MALLOC_CHK(ptr, var, cls, fn) if (ptr == NULL) { \
+ clean(); \
+ std::stringstream ss; \
+ ss << var << ": malloc() failed: " << FORMAT_SYSERR(errno); \
+ throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), cls, fn); \
+ }
+
#define MAX_MSG_SIZE 1024
#define MAX_THROWING_SIZE 128
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -45,8 +45,8 @@
jinf::jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception):
_jver(0),
- _num_files(0),
- _fsize_sblks(0),
+ _num_jfiles(0),
+ _jfsize_sblks(0),
_sblk_size_dblks(0),
_dblk_size(0),
_wmgr_page_size_dblks(0),
@@ -65,14 +65,14 @@
}
jinf::jinf(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const timespec& ts):
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const timespec& ts):
_jver(RHM_JDAT_VERSION),
_jid(jid),
_jdir(jdir),
_base_filename(base_filename),
_ts(ts),
- _num_files(JRNL_NUM_FILES),
- _fsize_sblks(JRNL_FILE_SIZE),
+ _num_jfiles(num_jfiles),
+ _jfsize_sblks(jfsize_sblks),
_sblk_size_dblks(JRNL_SBLK_SIZE),
_dblk_size(JRNL_DBLK_SIZE),
_wmgr_page_size_dblks(JRNL_WMGR_PAGE_SIZE),
@@ -100,16 +100,16 @@
ss << "; required=" << RHM_JDAT_VERSION << std::endl;
err = true;
}
- if (_num_files != JRNL_NUM_FILES)
+ if (_num_jfiles < JRNL_MIN_NUM_FILES)
{
- ss << "JRNL_NUM_FILES mismatch: " << _num_files;
- ss << "; required=" << JRNL_NUM_FILES << std::endl;
+ ss << "Number of journal files too small: found=" << _num_jfiles;
+ ss << "; minimum=" << JRNL_MIN_NUM_FILES << std::endl;
err = true;
}
- if (_fsize_sblks != JRNL_FILE_SIZE)
+ if (_jfsize_sblks < JRNL_MIN_FILE_SIZE)
{
- ss << "JRNL_FILE_SIZE mismatch: " << _fsize_sblks;
- ss << "; required=" << JRNL_FILE_SIZE << std::endl;
+ ss << "Journal file size too small: found=" << _jfsize_sblks;
+ ss << "; minimum=" << JRNL_MIN_FILE_SIZE << " (sblks)" << std::endl;
err = true;
}
if (_sblk_size_dblks != JRNL_SBLK_SIZE)
@@ -162,7 +162,7 @@
if (!_valid_flag)
validate();
- for (u_int16_t fnum=0; fnum < _num_files; fnum++)
+ for (u_int16_t fnum=0; fnum < _num_jfiles; fnum++)
{
std::stringstream ss;
ss << _jdir << "/" << _base_filename << ".";
@@ -235,8 +235,8 @@
ss << " Journal directory: \"" << _jdir << "\"" << std::endl;
ss << " Journal base filename: \"" << _base_filename << "\"" << std::endl;
ss << " Journal version: " << (unsigned)_jver << std::endl;
- ss << " Number of journal files (JRNL_NUM_FILES): " << _num_files << std::endl;
- ss << " Journal file size (JRNL_FILE_SIZE): " << _fsize_sblks << " sblks" << std::endl;
+ ss << " Number of journal files: " << _num_jfiles << std::endl;
+ ss << " Journal file size: " << _jfsize_sblks << " sblks" << std::endl;
ss << " Softblock size (JRNL_SBLK_SIZE): " << _sblk_size_dblks << " dblks" << std::endl;
ss << " Datablock size (JRNL_DBLK_SIZE): " << _dblk_size << " bytes" << std::endl;
ss << " Write page size (JRNL_WMGR_PAGE_SIZE): " << _wmgr_page_size_dblks << " dblks" <<
@@ -273,8 +273,8 @@
ss << "\" />" << std::endl;
ss << " </creation_time>" << std::endl;
ss << " <journal_file_geometry>" << std::endl;
- ss << " <JRNL_NUM_FILES value=\"" << _num_files << "\" />" << std::endl;
- ss << " <JRNL_FILE_SIZE value=\"" << _fsize_sblks << "\" />" << std::endl;
+ ss << " <number_jrnl_files value=\"" << _num_jfiles << "\" />" << std::endl;
+ ss << " <jrnl_file_size_sblks value=\"" << _jfsize_sblks << "\" />" << std::endl;
ss << " <JRNL_SBLK_SIZE value=\"" << _sblk_size_dblks << "\" />" << std::endl;
ss << " <JRNL_DBLK_SIZE value=\"" << _dblk_size << "\" />" << std::endl;
ss << " <JRNL_WMGR_PAGE_SIZE value=\"" << _wmgr_page_size_dblks << "\" />" << std::endl;
@@ -307,10 +307,10 @@
string_value(_jdir, buff);
else if(::strstr(buff, "base_filename"))
string_value(_base_filename, buff);
- else if(::strstr(buff, "JRNL_NUM_FILES"))
- _num_files = u_int16_value(buff);
- else if(::strstr(buff, "JRNL_FILE_SIZE"))
- _fsize_sblks = u_int32_value(buff);
+ else if(::strstr(buff, "number_jrnl_files"))
+ _num_jfiles = u_int16_value(buff);
+ else if(::strstr(buff, "jrnl_file_size_sblks"))
+ _jfsize_sblks = u_int32_value(buff);
else if(::strstr(buff, "JRNL_SBLK_SIZE"))
_sblk_size_dblks = u_int16_value(buff);
else if(::strstr(buff, "JRNL_DBLK_SIZE"))
Modified: store/trunk/cpp/lib/jrnl/jinf.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jinf.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -52,8 +52,8 @@
std::string _jdir;
std::string _base_filename;
timespec _ts;
- u_int16_t _num_files;
- u_int32_t _fsize_sblks;
+ u_int16_t _num_jfiles;
+ u_int32_t _jfsize_sblks;
u_int16_t _sblk_size_dblks;
u_int32_t _dblk_size;
u_int32_t _wmgr_page_size_dblks;
@@ -71,7 +71,7 @@
jinf(const std::string& jinf_filename, bool validate_flag) throw (jexception);
// constructor for writing jinf file
jinf(const std::string& jid, const std::string& jdir, const std::string& base_filename,
- const timespec& ts);
+ const u_int16_t num_jfiles, const u_int32_t jfsize_sblks, const timespec& ts);
virtual ~jinf();
void validate() throw (jexception);
@@ -83,8 +83,8 @@
inline const std::string& jdir() const { return _jdir; }
inline const std::string& base_filename() const { return _base_filename; }
inline const timespec& ts() const { return _ts; }
- inline const u_int16_t num_files() const { return _num_files; }
- inline const u_int32_t fsize_sblks() const { return _fsize_sblks; }
+ inline const u_int16_t num_jfiles() const { return _num_jfiles; }
+ inline const u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
inline const u_int16_t sblk_size_dblks() const { return _sblk_size_dblks; }
inline const u_int32_t dblk_size() const { return _dblk_size; }
inline const u_int32_t wmgr_page_size_dblks() const { return _wmgr_page_size_dblks; }
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -172,6 +172,7 @@
static void chk_hdr(const hdr& hdr) throw (jexception);
static void chk_rid(const hdr& hdr, u_int64_t rid) throw (jexception);
static void chk_tail(const rec_tail& tail, const hdr& hdr) throw (jexception);
+ virtual void clean() = 0;
}; // class jrec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/lfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/lfh.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -38,13 +38,13 @@
namespace journal
{
-lfh::lfh():
- nlfh()
+lfh::lfh(const u_int32_t jfsize_sblks):
+ nlfh(jfsize_sblks)
{}
-lfh::lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const ro)
- throw (jexception*):
- nlfh(fbasename, fid, ro)
+lfh::lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
+ rcvdat const * const ro) throw (jexception*):
+ nlfh(fbasename, fid, jfsize_sblks, ro)
{}
lfh::~lfh()
Modified: store/trunk/cpp/lib/jrnl/lfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/lfh.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/lfh.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -55,9 +55,9 @@
class lfh : public nlfh
{
public:
- lfh();
- lfh(const std::string& fbasename, const u_int16_t fid, rcvdat const * const ro)
- throw (jexception*);
+ lfh(const u_int32_t jfsize_sblks);
+ lfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
+ rcvdat const * const ro) throw (jexception*);
virtual ~lfh();
};
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -45,52 +45,54 @@
namespace journal
{
-nlfh::nlfh():
+nlfh::nlfh(const u_int32_t jfsize_sblks):
_fname(),
_fid(0),
+ _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
_rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1))
+ _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0)
#endif
{}
-nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const ro)
- throw (jexception):
+nlfh::nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
+ const rcvdat* const ro) throw (jexception):
_fname(),
_fid(fid),
+ _ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
_rd_fh(-1),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
#ifdef RHM_RDONLY
- _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)),
- _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1))
+ _wr_subm_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1)),
+ _wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE * (jfsize_dblks + 1))
#else
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0)
#endif
{
- initialize(fbasename, fid, ro);
- open();
+ initialize(fbasename, fid, jfsize_sblks, ro);
+ open_fh();
}
nlfh::~nlfh()
{
- close();
+ close_fh();
}
void
nlfh::initialize(const std::string& fbasename, const u_int16_t fid,
- const rcvdat* const ro) throw (jexception)
+ const u_int32_t jfsize_sblks, const rcvdat* const ro) throw (jexception)
{
std::stringstream ss;
@@ -122,8 +124,8 @@
}
else
{
- _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
- _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_subm_cnt_dblks = _ffull_dblks;
+ _wr_cmpl_cnt_dblks = _ffull_dblks;
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
}
@@ -134,10 +136,10 @@
const size_t writesize = sblksize * JRNL_WMGR_PAGE_SIZE;
// NOTE: The journal file size is always one sblock bigger than the specified journal
- // file size (JRNL_FILE_SIZE in jcfg.hpp), which is the data content size. The extra
+ // file size, which is the data content size. The extra
// block is for the journal file header which precedes all data on each file and is
// exactly one softblock in size.
- u_int32_t nsblks = JRNL_FILE_SIZE + 1;
+ u_int32_t nsblks = jfsize_sblks + 1;
void* nullbuf = NULL;
if (::posix_memalign(&nullbuf, sblksize, writesize))
{
@@ -204,8 +206,8 @@
}
else
{
- _wr_subm_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
- _wr_cmpl_cnt_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+ _wr_subm_cnt_dblks = _ffull_dblks;
+ _wr_cmpl_cnt_dblks = _ffull_dblks;
}
_rec_enqcnt = ro->_enq_cnt_list[_fid];
return true;
@@ -287,11 +289,11 @@
const u_int32_t
nlfh::add_wr_subm_cnt_dblks(u_int32_t a) throw (jexception)
{
- if (_wr_subm_cnt_dblks + a > JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) // Allow for file header
+ if (_wr_subm_cnt_dblks + a > _ffull_dblks) // Allow for file header
{
std::stringstream ss;
ss << "_wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks << " incr=" << a;
- ss << " fsize=" << JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1) << " dblks";
+ ss << " fsize=" << _ffull_dblks << " dblks";
throw jexception(jerrno::JERR_NLFH_FILEOFFSOVFL, ss.str().c_str(), "nlfh",
"add_wr_subm_cnt_dblks");
}
@@ -328,7 +330,7 @@
// Private functions
void
-nlfh::open() throw (jexception)
+nlfh::open_fh() throw (jexception)
{
_rd_fh = ::open(_fname.c_str(), O_RDONLY | O_DIRECT);
if (_rd_fh < 0)
@@ -348,7 +350,7 @@
}
void
-nlfh::close()
+nlfh::close_fh()
{
if (_rd_fh >= 0)
{
@@ -362,9 +364,5 @@
}
}
-// static const definition
-
-const u_int32_t nlfh::_fsize_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -58,9 +58,9 @@
class nlfh
{
protected:
- static const u_int32_t _fsize_dblks; ///< File size in dblks
std::string _fname; ///< File name
u_int16_t _fid; ///< File ID (ordinal number in ring buffer)
+ const u_int32_t _ffull_dblks; ///< File size in dblks (incl. file header)
int _rd_fh; ///< Read file handle
int _wr_fh; ///< Write file handle
u_int32_t _rec_enqcnt; ///< Count of enqueued records
@@ -70,14 +70,14 @@
u_int32_t _wr_cmpl_cnt_dblks; ///< Write file count (data blocks) for completed AIO
public:
- nlfh();
+ nlfh(const u_int32_t jfsize_sblks);
// Constructors with implicit initialize() and open()
- nlfh(const std::string& fbasename, const u_int16_t fid, const rcvdat* const ro)
- throw (jexception);
+ nlfh(const std::string& fbasename, const u_int16_t fid, const u_int32_t jfsize_sblks,
+ const rcvdat* const ro) throw (jexception);
virtual ~nlfh();
virtual void initialize(const std::string& fbasename, const u_int16_t fid,
- const rcvdat* const ro) throw (jexception);
+ const u_int32_t jfsize_sblks, const rcvdat* const ro) throw (jexception);
virtual bool reset(const rcvdat* const ro = NULL);
inline const std::string& fname() const { return _fname; }
@@ -120,9 +120,9 @@
inline const bool wr_empty() const { return _wr_subm_cnt_dblks == 0; }
inline const u_int32_t wr_remaining_dblks() const
- { return _fsize_dblks - _wr_subm_cnt_dblks; }
- inline const bool is_wr_full() const { return _fsize_dblks == _wr_subm_cnt_dblks; }
- inline const bool is_wr_compl() const { return _fsize_dblks == _wr_cmpl_cnt_dblks; }
+ { return _ffull_dblks - _wr_subm_cnt_dblks; }
+ inline const bool is_wr_full() const { return _ffull_dblks == _wr_subm_cnt_dblks; }
+ inline const bool is_wr_compl() const { return _ffull_dblks == _wr_cmpl_cnt_dblks; }
inline const u_int32_t wr_aio_outstanding_dblks() const
{ return _wr_subm_cnt_dblks - _wr_cmpl_cnt_dblks; }
inline const bool wr_file_rotate() const { return is_wr_full(); }
@@ -131,8 +131,8 @@
const std::string& status_str(std::string& s) const;
protected:
- virtual void open() throw (jexception);
- virtual void close();
+ virtual void open_fh() throw (jexception);
+ virtual void close_fh();
};
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -151,31 +151,16 @@
}
// 2. Allocate array of page pointers
_page_ptr_arr = (void**)::malloc(_pages * sizeof(void*));
- if (_page_ptr_arr == NULL)
- {
- clean();
- ss << "_page_ptr_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr", "initialize");
- }
+ MALLOC_CHK(_page_ptr_arr, "_page_ptr_arr", "pmgr", "initialize");
// 3. Allocate and initilaize page control block (page_cb) array
_page_cb_arr = (page_cb*)::malloc(_pages * sizeof(page_cb));
- if (_page_cb_arr == NULL)
- {
- clean();
- ss << "_page_cb_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr", "initialize");
- }
+ MALLOC_CHK(_page_cb_arr, "_page_cb_arr", "pmgr", "initialize");
::memset(_page_cb_arr, 0, _pages * sizeof(page_cb));
// 5. Allocate IO control block (iocb) array
_iocb_arr = (iocb*)::malloc(_pages * sizeof(iocb));
- if (_iocb_arr == NULL)
- {
- clean();
- ss << "_iocb_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr", "initialize");
- }
+ MALLOC_CHK(_iocb_arr, "_iocb_arr", "pmgr", "initialize");
// 6. Set page pointers in _page_ptr_arr, _page_cb_arr and iocbs to pages within page block
for (u_int16_t i=0; i<_pages; i++)
@@ -192,12 +177,7 @@
// 7. Allocate io_event array, max one event per cache page in rmgr and wmgr
const u_int16_t max_aio_evts = JRNL_RMGR_PAGES + JRNL_WMGR_PAGES;
_ioevt_arr = (io_event*)::malloc(max_aio_evts * sizeof(io_event));
- if (_ioevt_arr == NULL)
- {
- clean();
- ss << "_ioevt_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "pmgr", "initialize");
- }
+ MALLOC_CHK(_ioevt_arr, "_ioevt_arr", "pmgr", "initialize");
// 8. Initialize AIO context
if (int ret = ::io_queue_init(max_aio_evts, &_ioctx))
@@ -207,23 +187,10 @@
}
}
-// bool
-// pmgr::rotate_page(page_state state)
-// {
-// _page_cb_arr[_pg_index]._state = state;
-// if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
-// {
-// _pg_offset_dblks = 0;
-// _pg_cntr++;
-// }
-// if (++_pg_index >= _pages)
-// _pg_index = 0;
-// return false;
-// }
-
void
pmgr::clean()
{
+ // clean up allocated memory here
if (_ioctx)
::io_queue_release(_ioctx);
if (_page_base_ptr)
Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -52,7 +52,7 @@
bool _full; ///< Journal is full
std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
- rcvdat():
+ rcvdat(u_int16_t num_jfiles):
_owi(false),
_empty(true),
_ffid(0),
@@ -61,7 +61,7 @@
_eo(0),
_h_rid(0),
_full(false),
- _enq_cnt_list(JRNL_NUM_FILES, 0)
+ _enq_cnt_list(num_jfiles, 0)
{}
void reset()
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -766,7 +766,7 @@
// This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()
// FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is now...
// Need to move reset into if (_rrfc.file_rotate()) above.
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_RMGR_PAGE_SIZE))
+ if (_pg_cntr >= (_jc->jfsize_sblks() / JRNL_RMGR_PAGE_SIZE))
_pg_cntr = 0;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -58,7 +58,7 @@
class rmgr : public pmgr
{
private:
- rrfc& _rrfc; ///< Ref to write rotating file controller
+ rrfc& _rrfc; ///< Ref to read rotating file controller
hdr _hdr; ///< Header used to determind record type
public:
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -61,7 +61,9 @@
{}
txn_rec::~txn_rec()
-{}
+{
+ clean();
+}
void
txn_rec::reset(const u_int32_t magic)
@@ -270,12 +272,7 @@
rd_cnt = _txn_hdr.size();
chk_hdr();
_buff = ::malloc(_txn_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "txn_rec", "decode");
- }
+ MALLOC_CHK(_buff, "_buff", "txn_rec", "decode");
const u_int32_t hdr_xid_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize);
const u_int32_t hdr_xid_tail_dblks = size_dblks(txn_hdr::size() + _txn_hdr._xidsize +
rec_tail::size());
@@ -339,12 +336,7 @@
ifsp->ignore(sizeof(u_int32_t)); // _filler0
#endif
_buff = ::malloc(_txn_hdr._xidsize);
- if (_buff == NULL)
- {
- std::stringstream ss;
- ss << "_buff malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "deq_rec", "rcv_decode");
- }
+ MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
// Decode xid
ifsp->read((char*)_buff, _txn_hdr._xidsize);
size_t size_read = ifsp->gcount();
@@ -427,5 +419,11 @@
jrec::chk_tail(_txn_tail, _txn_hdr._hdr);
}
+void
+txn_rec::clean()
+{
+ // clean up allocated memory here
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -90,6 +90,7 @@
void chk_hdr() const throw (jexception);
void chk_hdr(u_int64_t rid) const throw (jexception);
void chk_tail() const throw (jexception);
+ virtual void clean();
}; // class txn_rec
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -35,6 +35,7 @@
#include <assert.h>
#include <cerrno>
#include <sstream>
+#include <jrnl/jcntl.hpp>
#include <jrnl/jerrno.hpp>
namespace rhm
@@ -51,6 +52,8 @@
_fhdr_ptr_arr(NULL),
_iocba(NULL),
_cached_offset_dblks(0),
+ _jfsize_dblks(_jc->jfsize_sblks() * JRNL_SBLK_SIZE),
+ _jfsize_pgs(_jc->jfsize_sblks() / JRNL_WMGR_PAGE_SIZE),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -68,6 +71,8 @@
_fhdr_ptr_arr(NULL),
_iocba(NULL),
_cached_offset_dblks(0),
+ _jfsize_dblks(_jc->jfsize_sblks() * JRNL_SBLK_SIZE),
+ _jfsize_pgs(_jc->jfsize_sblks() / JRNL_WMGR_PAGE_SIZE),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
@@ -182,8 +187,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _enq_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -209,7 +214,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -332,8 +337,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _deq_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -359,7 +364,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -478,8 +483,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -505,7 +510,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -620,8 +625,8 @@
if (_wrfc.empty())
{
u_int32_t rec_dblks_rem = _txn_rec.rec_size_dblks() - data_offs_dblks;
- bool file_fit = rec_dblks_rem <= JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
- bool file_full = rec_dblks_rem == JRNL_FILE_SIZE * JRNL_SBLK_SIZE;
+ bool file_fit = rec_dblks_rem <= _jfsize_dblks;
+ bool file_full = rec_dblks_rem == _jfsize_dblks;
size_t fro = 0;
if (cont)
{
@@ -647,7 +652,7 @@
}
// File full?
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -676,7 +681,7 @@
wmgr::flush()
{
iores res = write_flush();
- if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
+ if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
@@ -886,8 +891,9 @@
void
wmgr::initialize() throw (jexception)
{
+ const u_int16_t num_jfiles = _jc->num_jfiles();
pmgr::initialize();
- if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * JRNL_NUM_FILES))
+ if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * num_jfiles))
{
clean();
std::stringstream ss;
@@ -896,24 +902,12 @@
throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr", "initialize");
}
_fhdr_ptr_arr = (void**)::malloc(_pages * sizeof(void*));
- if (_fhdr_ptr_arr == NULL)
+ MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
+ _iocba = (iocb**)::malloc(sizeof(iocb*) * num_jfiles);
+ MALLOC_CHK(_iocba, "_iocba", "wmgr", "initialize");
+ ::memset(_iocba, 0, sizeof(iocb*) * num_jfiles);
+ for (u_int16_t i=0; i<num_jfiles; i++)
{
- clean();
- std::stringstream ss;
- ss << "_fhdr_ptr_arr malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr", "initialize");
- }
- _iocba = (iocb**)::malloc(sizeof(iocb*) * JRNL_NUM_FILES);
- if (_iocba == NULL)
- {
- clean();
- std::stringstream ss;
- ss << "_iocba malloc(): errno=" << errno;
- throw jexception(jerrno::JERR__MALLOC, ss.str().c_str(), "wmgr", "initialize");
- }
- ::memset(_iocba, 0, sizeof(iocb*) * JRNL_NUM_FILES);
- for (u_int16_t i=0; i<JRNL_NUM_FILES; i++)
- {
_fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
_iocba[i] = ::new iocb;
}
@@ -1044,6 +1038,7 @@
void
wmgr::clean()
{
+ // clean up allocated memory here
if (_fhdr_base_ptr)
{
::free(_fhdr_base_ptr);
@@ -1056,7 +1051,7 @@
}
if (_iocba)
{
- for (u_int32_t i=0; i<JRNL_NUM_FILES; i++)
+ for (u_int32_t i=0; i<_jc->num_jfiles(); i++)
if (_iocba[i])
::delete _iocba[i];
::free(_iocba);
Modified: store/trunk/cpp/lib/jrnl/wmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/lib/jrnl/wmgr.hpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -77,6 +77,8 @@
iocb** _iocba; ///< Array of iocb pointers for file header writes
u_int32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks)
std::deque<data_tok*> _ddtokl; ///< Deferred dequeue data_tok list
+ const u_int32_t _jfsize_dblks; ///< Journal file size in dblks (NOT sblks!)
+ const u_int32_t _jfsize_pgs; ///< Journal file size in cache pages
// TODO: Convert _enq_busy etc into a proper threadsafe lock
// TODO: Convert to enum? Are these encodes mutually exclusive?
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -39,6 +39,8 @@
#define AIO_SLEEP_TIME 1000
#define MSG_SIZE 100
#define XID_SIZE 64
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 96
using namespace std;
@@ -48,7 +50,7 @@
try
{
char* test_name = "InstantiationTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
}
catch (const rhm::journal::jexception& e)
{
@@ -64,7 +66,7 @@
try
{
char* test_name = "InitializationTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
}
catch (const rhm::journal::jexception& e)
@@ -84,15 +86,15 @@
u_int64_t highest_rid;
char* test_name = "EmptyRecoverTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
jc.recover_complete();
}
@@ -111,7 +113,7 @@
try
{
char* test_name = "EnqueueTest";
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
// Non-txn
@@ -142,13 +144,13 @@
// Non-txn
char* test_name = "RecoverReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -162,7 +164,7 @@
// Txn
test_name = "TxnRecoverReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 1, XID_SIZE);
txn_list.push_back(xid);
@@ -171,7 +173,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -200,13 +202,13 @@
// Non-txn
char* test_name = "RecoveredReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -228,7 +230,7 @@
// Txn
test_name = "TxnRecoveredReadTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 2, XID_SIZE);
txn_list.push_back(xid);
@@ -237,7 +239,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -274,13 +276,13 @@
// Non-txn
char* test_name = "RecoveredDequeueTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -304,7 +306,7 @@
// Txn
test_name = "TxnRecoveredDequeueTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 3, XID_SIZE);
txn_list.push_back(xid);
@@ -313,7 +315,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
@@ -352,7 +354,7 @@
// Non-txn
char* test_name = "FlagsRecoverdTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
// Transient msgs - should not recover
for (int m=0; m<NUM_MSGS; m++)
@@ -368,7 +370,7 @@
enq_extern_msg(&jc, false);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -421,7 +423,7 @@
// Txn
test_name = "TxnFlagsRecoverdTest";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
create_xid(xid, 4, XID_SIZE);
txn_list.push_back(xid);
@@ -440,7 +442,7 @@
txn_commit(&jc, xid);
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -508,7 +510,7 @@
// Non-txn
char* test_name = "ComplexRecoveryTest1";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
@@ -528,7 +530,7 @@
}
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
@@ -565,7 +567,7 @@
// Txn
test_name = "TxnComplexRecoveryTest1";
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.initialize();
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
@@ -592,7 +594,7 @@
}
}
{
- rhm::journal::jcntl jc(test_name, "jdata", test_name);
+ rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
jc.recover(txn_list, highest_rid);
// Check that only last n readable (as before)
Modified: store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -38,13 +38,16 @@
#include <iomanip>
#include <iostream>
#include <jrnl/file_hdr.hpp>
+#include <jrnl/jcfg.hpp>
#include <jrnl/jdir.hpp>
#include <jrnl/jerrno.hpp>
#include <jrnl/jexception.hpp>
#include <sys/stat.h>
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 128
+
#define ERRORSTR(e) ::strerror(ret) << " (" << ret << ")"
-#define NUM_JDAT_FILES 20
#define NUM_CLEAR_OPS 20
using namespace boost::unit_test;
@@ -281,7 +284,7 @@
void create_jrnl_fileset(const char* dirname, const char* base_filename)
{
create_jinf_file(dirname, base_filename);
- for (u_int32_t fid = 0; fid < NUM_JDAT_FILES; fid++)
+ for (u_int32_t fid = 0; fid < NUM_JFILES; fid++)
{
u_int64_t rid = 0x12340000 + (fid * 0x25);
create_jdat_file(dirname, base_filename, fid, rid);
@@ -293,7 +296,7 @@
{
std::stringstream fn;
fn << dirname << "/" << base_filename << ".";
- fn << std::setfill('0') << std::setw(4) << std::hex << fid << ".jdat";
+ fn << std::setfill('0') << std::hex << std::setw(4) << fid << ".jdat";
file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, 0, first_rid, fid, 0x200, true);
std::ofstream of(fn.str().c_str(), std::ofstream::out | std::ofstream::trunc);
if (!of.good())
@@ -306,7 +309,7 @@
{
timespec ts;
::clock_gettime(CLOCK_REALTIME, &ts);
- jinf ji("test journal id", dirname, base_filename, ts);
+ jinf ji("test journal id", dirname, base_filename, NUM_JFILES, JFSIZE_SBLKS, ts);
ji.write();
}
@@ -319,7 +322,7 @@
BOOST_CHECK_EQUAL(count_dir_contents(dirname, false, true), num_subdirs);
// Journal file count
- unsigned num_jrnl_files = jrnl_present ? NUM_JDAT_FILES + 1 : 0;
+ unsigned num_jrnl_files = jrnl_present ? NUM_JFILES + 1 : 0;
BOOST_CHECK_EQUAL(count_dir_contents(dirname, true, false), num_jrnl_files);
// Check journal files are present
Modified: store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-30 15:43:01 UTC (rev 1406)
+++ store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-30 22:57:44 UTC (rev 1407)
@@ -40,6 +40,9 @@
#include <jrnl/jexception.hpp>
#include <jrnl/jinf.hpp>
+#define NUM_JFILES 4
+#define JFSIZE_SBLKS 128
+
using namespace boost::unit_test;
using namespace rhm::journal;
@@ -66,7 +69,7 @@
void test_write_constructor()
{
::clock_gettime(CLOCK_REALTIME, &ts);
- jinf ji(jid, jdir, base_filename, ts);
+ jinf ji(jid, jdir, base_filename, NUM_JFILES, JFSIZE_SBLKS, ts);
BOOST_CHECK_EQUAL(ji.jver(), RHM_JDAT_VERSION);
BOOST_CHECK(ji.jid().compare(jid) == 0);
BOOST_CHECK(ji.jdir().compare(jdir) == 0);
@@ -74,8 +77,8 @@
timespec this_ts = ji.ts();
BOOST_CHECK_EQUAL(this_ts.tv_sec, ts.tv_sec);
BOOST_CHECK_EQUAL(this_ts.tv_nsec, ts.tv_nsec);
- BOOST_CHECK_EQUAL(ji.num_files(), (u_int16_t)JRNL_NUM_FILES);
- BOOST_CHECK_EQUAL(ji.fsize_sblks(), (u_int32_t)JRNL_FILE_SIZE);
+ BOOST_CHECK_EQUAL(ji.num_jfiles(), (u_int16_t)NUM_JFILES);
+ BOOST_CHECK_EQUAL(ji.jfsize_sblks(), (u_int32_t)JFSIZE_SBLKS);
BOOST_CHECK_EQUAL(ji.sblk_size_dblks(), (u_int16_t)JRNL_SBLK_SIZE);
BOOST_CHECK_EQUAL(ji.dblk_size(), (u_int32_t)JRNL_DBLK_SIZE);
BOOST_CHECK_EQUAL(ji.wmgr_page_size_dblks(), (u_int32_t)JRNL_WMGR_PAGE_SIZE);
@@ -97,8 +100,8 @@
const timespec this_ts = ji.ts();
BOOST_CHECK_EQUAL(this_ts.tv_sec, ts.tv_sec);
BOOST_CHECK_EQUAL(this_ts.tv_nsec, ts.tv_nsec);
- BOOST_CHECK_EQUAL(ji.num_files(), (u_int16_t)JRNL_NUM_FILES);
- BOOST_CHECK_EQUAL(ji.fsize_sblks(), (u_int32_t)JRNL_FILE_SIZE);
+ BOOST_CHECK_EQUAL(ji.num_jfiles(), (u_int16_t)NUM_JFILES);
+ BOOST_CHECK_EQUAL(ji.jfsize_sblks(), (u_int32_t)JFSIZE_SBLKS);
BOOST_CHECK_EQUAL(ji.sblk_size_dblks(), (u_int16_t)JRNL_SBLK_SIZE);
BOOST_CHECK_EQUAL(ji.dblk_size(), (u_int32_t)JRNL_DBLK_SIZE);
BOOST_CHECK_EQUAL(ji.wmgr_page_size_dblks(), (u_int32_t)JRNL_WMGR_PAGE_SIZE);
@@ -138,7 +141,7 @@
void test_analyze_linear_journal()
{
std::vector<std::string> jfiles;
- for (int i=0; i<JRNL_NUM_FILES; i++)
+ for (int i=0; i<NUM_JFILES; i++)
{
create_journal_filenames(jfiles);
@@ -160,11 +163,11 @@
void create_journal_filenames(std::vector<std::string>& jfiles)
{
- for (int fnum=0; fnum<JRNL_NUM_FILES; fnum++)
+ for (int fnum=0; fnum<NUM_JFILES; fnum++)
{
std::stringstream fn;
fn << jdir << "/" << base_filename << ".";
- fn << std::setfill('0') << std::setw(4) << fnum << "." << JRNL_DATA_EXTENSION;
+ fn << std::setfill('0') << std::hex << std::setw(4) << fnum << "." << JRNL_DATA_EXTENSION;
jfiles.push_back(fn.str());
}
}
@@ -176,7 +179,7 @@
file_hdr fh;
std::vector<std::string>::iterator itr;
u_int32_t fid = 0;
- u_int64_t rid = rid_offs + ((JRNL_NUM_FILES - min_fid_offs) % JRNL_NUM_FILES) * rid_incr;
+ u_int64_t rid = rid_offs + ((NUM_JFILES - min_fid_offs) % NUM_JFILES) * rid_incr;
for (itr=jfiles.begin(); itr<jfiles.end(); itr++)
{
@@ -200,7 +203,7 @@
of.close();
if (++fid == min_fid_offs)
- rid -= rid_incr * (JRNL_NUM_FILES - 1);
+ rid -= rid_incr * (NUM_JFILES - 1);
else
rid += rid_incr;
}
17 years
rhmessaging commits: r1406 - in mgmt: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-30 10:43:01 -0500 (Fri, 30 Nov 2007)
New Revision: 1406
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/broker.py
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.sql
Log:
Fixes my use of sqlobject to be more natural, using the joinColumn arg
to MultipleJoin.
Makes group, profile, and cluster associate with broker registrations,
not brokers. This is what we want, so someone can group and configure
brokers that are not yet up and represented in objects sent by a
managed broker instance out in the world.
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2007-11-30 14:57:59 UTC (rev 1405)
+++ mgmt/cumin/python/cumin/__init__.py 2007-11-30 15:43:01 UTC (rev 1406)
@@ -41,7 +41,7 @@
self.model.sys = sys
for reg in BrokerRegistration.select():
- self.mint.addManagedBroker(reg.host, reg.port)
+ self.mint.addManagedBroker(reg.host, reg.port or 5672)
self.cumin_page = CuminPage(self, "cumin.html")
self.set_default_page(self.cumin_page)
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-30 14:57:59 UTC (rev 1405)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-30 15:43:01 UTC (rev 1406)
@@ -38,7 +38,7 @@
def get_items(self, session, model):
start, end = self.paginator.get_bounds(session)
- return list(Broker.select())[start:end]
+ return list(Broker.select()[start:end])
def do_process(self, session, model):
if self.submit.get(session):
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-30 14:57:59 UTC (rev 1405)
+++ mgmt/mint/python/mint/__init__.py 2007-11-30 15:43:01 UTC (rev 1406)
@@ -20,42 +20,29 @@
host = StringCol(length=1000, default=None)
port = SmallIntCol(default=None)
broker = ForeignKey("Broker", cascade="null", default=None)
+ groups = RelatedJoin("BrokerGroup")
+ cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
+ profile = ForeignKey("BrokerProfile", cascade="null", default=None)
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
- brokers = RelatedJoin("Broker")
+ brokers = RelatedJoin("BrokerRegistration")
-Broker.sqlmeta.addJoin(RelatedJoin("BrokerGroup", joinMethodName="groups"))
-
class BrokerCluster(SQLObject):
name = StringCol(length=1000, default=None)
- brokers = MultipleJoin("Broker")
+ brokers = MultipleJoin("BrokerRegistration", joinColumn="cluster_id")
-fk = ForeignKey("BrokerCluster", cascade="null",
- default=None, name="brokerCluster")
-Broker.sqlmeta.addColumn(fk)
-setattr(Broker, "cluster", fk)
-
class BrokerProfile(SQLObject):
name = StringCol(length=1000, default=None)
- brokers = MultipleJoin("Broker")
- properties = MultipleJoin("ConfigProperty")
+ brokers = MultipleJoin("BrokerRegistration", joinColumn="profile_id")
+ properties = MultipleJoin("ConfigProperty", joinColumn="profile_id")
-fk = ForeignKey("BrokerProfile", cascade="null", default=None,
- name="brokerProfile")
-Broker.sqlmeta.addColumn(fk)
-setattr(Broker, "profile", fk)
-
class ConfigProperty(SQLObject):
name = StringCol(length=1000, default=None)
value = StringCol(length=1000, default=None)
type = StringCol(length=1, default="s")
+ profile = ForeignKey("BrokerProfile", cascade="null", default=None)
-fk = ForeignKey("BrokerProfile", cascade="null", default=None,
- name="brokerProfile")
-ConfigProperty.sqlmeta.addColumn(fk)
-setattr(ConfigProperty, "profile", fk)
-
class OriginalIdDict:
def __init__(self):
self.idMap = dict()
Modified: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql 2007-11-30 14:57:59 UTC (rev 1405)
+++ mgmt/mint/python/mint/schema.sql 2007-11-30 15:43:01 UTC (rev 1406)
@@ -7,6 +7,10 @@
id SERIAL PRIMARY KEY,
name VARCHAR(1000)
);
+CREATE TABLE broker_group_broker_registration (
+broker_group_id INT NOT NULL,
+broker_registration_id INT NOT NULL
+);
CREATE TABLE broker_profile (
id SERIAL PRIMARY KEY,
@@ -18,7 +22,9 @@
name VARCHAR(1000),
host VARCHAR(1000),
port SMALLINT,
- broker_id INT
+ broker_id INT,
+ cluster_id INT,
+ profile_id INT
);
CREATE TABLE config_property (
@@ -26,7 +32,7 @@
name VARCHAR(1000),
value VARCHAR(1000),
type VARCHAR(1),
- broker_profile_id INT
+ profile_id INT
);
CREATE TABLE binding (
@@ -72,14 +78,8 @@
initial_disk_page_size INT,
initial_pages_per_queue INT,
cluster_name VARCHAR(1000),
- version VARCHAR(1000),
- broker_cluster_id INT,
- broker_profile_id INT
+ version VARCHAR(1000)
);
-CREATE TABLE broker_broker_group (
-broker_id INT NOT NULL,
-broker_group_id INT NOT NULL
-);
CREATE TABLE broker_stats (
id SERIAL PRIMARY KEY,
@@ -376,8 +376,12 @@
ALTER TABLE broker_registration ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
-ALTER TABLE config_property ADD CONSTRAINT broker_profile_id_exists FOREIGN KEY (broker_profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+ALTER TABLE broker_registration ADD CONSTRAINT cluster_id_exists FOREIGN KEY (cluster_id) REFERENCES broker_cluster (id) ON DELETE SET NULL;
+ALTER TABLE broker_registration ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+
+ALTER TABLE config_property ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+
ALTER TABLE binding ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
ALTER TABLE binding ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
@@ -394,10 +398,6 @@
ALTER TABLE broker ADD CONSTRAINT system_id_exists FOREIGN KEY (system_id) REFERENCES system (id) ON DELETE SET NULL;
-ALTER TABLE broker ADD CONSTRAINT broker_cluster_id_exists FOREIGN KEY (broker_cluster_id) REFERENCES broker_cluster (id) ON DELETE SET NULL;
-
-ALTER TABLE broker ADD CONSTRAINT broker_profile_id_exists FOREIGN KEY (broker_profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
-
ALTER TABLE broker_stats ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE client ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES client_stats (id) ON DELETE SET NULL;
17 years
rhmessaging commits: r1405 - store/trunk/cpp.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2007-11-30 09:57:59 -0500 (Fri, 30 Nov 2007)
New Revision: 1405
Modified:
store/trunk/cpp/mrg-grid.spec
store/trunk/cpp/mrg-messaging.spec
store/trunk/cpp/mrg.spec
Log:
make mrg-xxx specfiles 'noarch'
Modified: store/trunk/cpp/mrg-grid.spec
===================================================================
--- store/trunk/cpp/mrg-grid.spec 2007-11-29 23:03:10 UTC (rev 1404)
+++ store/trunk/cpp/mrg-grid.spec 2007-11-30 14:57:59 UTC (rev 1405)
@@ -6,6 +6,7 @@
Group: System Environment/Libraries
URL: http://redhat.com/mrg
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+BuildArch: noarch
Requires: condor
Modified: store/trunk/cpp/mrg-messaging.spec
===================================================================
--- store/trunk/cpp/mrg-messaging.spec 2007-11-29 23:03:10 UTC (rev 1404)
+++ store/trunk/cpp/mrg-messaging.spec 2007-11-30 14:57:59 UTC (rev 1405)
@@ -6,6 +6,7 @@
Group: System Environment/Libraries
URL: http://redhat.com/mrg
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+BuildArch: noarch
Requires: rhm
Requires: rhm-docs
Modified: store/trunk/cpp/mrg.spec
===================================================================
--- store/trunk/cpp/mrg.spec 2007-11-29 23:03:10 UTC (rev 1404)
+++ store/trunk/cpp/mrg.spec 2007-11-30 14:57:59 UTC (rev 1405)
@@ -6,6 +6,7 @@
Group: System Environment/Libraries
URL: http://redhat.com/mrg
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root
+BuildArch: noarch
Requires: mrg-messaging
Requires: mrg-realtime
17 years
rhmessaging commits: r1404 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-29 18:03:10 -0500 (Thu, 29 Nov 2007)
New Revision: 1404
Modified:
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/broker.strings
mgmt/cumin/python/cumin/brokerprofile.py
Log:
Removes BrokerSet, long succeeded by BrokerSetForm.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-29 22:57:09 UTC (rev 1403)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-29 23:03:10 UTC (rev 1404)
@@ -15,13 +15,49 @@
strings = StringCatalog(__file__)
-class BrokerSet(ItemSet):
+class BrokerSetForm(ItemSet, Form):
+ def __init__(self, app, name):
+ super(BrokerSetForm, self).__init__(app, name)
+
+ self.broker = BrokerParameter(app, "param")
+ self.add_parameter(self.broker)
+ self.add_form_parameter(self.broker)
+
+ self.brokers = ListParameter(app, "id", self.broker)
+ self.add_parameter(self.brokers)
+ self.add_form_parameter(self.brokers)
+
+ self.submit = self.Submit(app, "submit", self)
+ self.add_child(self.submit)
+
+ self.paginator = self.BrokerPaginator(app, "page")
+ self.add_child(self.paginator)
+
def get_title(self, session, model):
return "Brokers %s" % fmt_count(len(model.get_brokers()))
def get_items(self, session, model):
- return sorted_by(model.brokers(), "id")
+ start, end = self.paginator.get_bounds(session)
+ return list(Broker.select())[start:end]
+ def do_process(self, session, model):
+ if self.submit.get(session):
+ self.submit.set(session, False)
+
+ for broker in self.brokers.get(session):
+ print "broker", broker
+
+ self.page().set_redirect_url(session, session.marshal())
+
+ def render_item_checkbox_name(self, session, broker):
+ return self.brokers.path()
+
+ def render_item_checkbox_value(self, session, broker):
+ return self.broker.marshal(broker)
+
+ def render_item_checkbox_checked_attr(self, session, broker):
+ return broker in self.brokers.get(session) and "checked=\"checked\""
+
def render_item_link(self, session, broker):
branch = session.branch()
self.page().show_broker(branch, broker).show_view(branch)
@@ -63,46 +99,6 @@
def render_item_load(self, session, broker):
return "%.2f" % random()
-class BrokerSetForm(BrokerSet, Form):
- def __init__(self, app, name):
- super(BrokerSetForm, self).__init__(app, name)
-
- self.broker = BrokerParameter(app, "param")
- self.add_parameter(self.broker)
- self.add_form_parameter(self.broker)
-
- self.brokers = ListParameter(app, "id", self.broker)
- self.add_parameter(self.brokers)
- self.add_form_parameter(self.brokers)
-
- self.submit = self.Submit(app, "submit", self)
- self.add_child(self.submit)
-
- self.paginator = self.BrokerPaginator(app, "page")
- self.add_child(self.paginator)
-
- def get_items(self, session, model):
- start, end = self.paginator.get_bounds(session)
- return list(Broker.select())[start:end]
-
- def do_process(self, session, model):
- if self.submit.get(session):
- self.submit.set(session, False)
-
- for broker in self.brokers.get(session):
- print "broker", broker
-
- self.page().set_redirect_url(session, session.marshal())
-
- def render_item_checkbox_name(self, session, broker):
- return self.brokers.path()
-
- def render_item_checkbox_value(self, session, broker):
- return self.broker.marshal(broker)
-
- def render_item_checkbox_checked_attr(self, session, broker):
- return broker in self.brokers.get(session) and "checked=\"checked\""
-
class Submit(FormButton):
def render_content(self, session, model):
return "Submit"
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2007-11-29 22:57:09 UTC (rev 1403)
+++ mgmt/cumin/python/cumin/broker.strings 2007-11-29 23:03:10 UTC (rev 1404)
@@ -1,23 +1,3 @@
-[BrokerSet.html]
-<table class="mobjects">
- <tr>
- <th>Name</th>
- <th>Profile</th>
- <th>Cluster</th>
- <th>Status</th>
- </tr>
-
- {items}
-</table>
-
-[BrokerSet.item_html]
-<tr>
- <td>{item_link}</td>
- <td>{item_profile_link}</td>
- <td>{item_cluster_link}</td>
- <td>{item_status}</td>
-</tr>
-
[BrokerSetForm.html]
<form id="{id}" method="post" action="?">
<!-- <select onchange="document.getElementById('{id}.submit').submit()"> -->
Modified: mgmt/cumin/python/cumin/brokerprofile.py
===================================================================
--- mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 22:57:09 UTC (rev 1403)
+++ mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 23:03:10 UTC (rev 1404)
@@ -74,7 +74,7 @@
def get_title(self, session, profile):
return "Configuration"
- class ProfileBrokerTab(BrokerSet):
+ class ProfileBrokerTab(BrokerSetForm):
def __init__(self, app, name):
super(BrokerProfileView.ProfileBrokerTab, self).__init__(app, name)
17 years
rhmessaging commits: r1403 - in mgmt/cumin: python/cumin and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-29 17:57:09 -0500 (Thu, 29 Nov 2007)
New Revision: 1403
Modified:
mgmt/cumin/bin/cumin-test
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/broker.strings
Log:
Make broker registration work, and use reg entries to actually connect
to running brokers.
Modified: mgmt/cumin/bin/cumin-test
===================================================================
--- mgmt/cumin/bin/cumin-test 2007-11-29 22:16:50 UTC (rev 1402)
+++ mgmt/cumin/bin/cumin-test 2007-11-29 22:57:09 UTC (rev 1403)
@@ -8,11 +8,9 @@
Options:
--port PORT
--data DATABASE-URL
- --broker BROKER
--bench [HITS]
--profile
--no-debug
- --no-demo-data
""",
sys.exit(1)
@@ -66,13 +64,10 @@
from cumin.demo import *
from cumin.model import *
-def do_main(port, broker, bench_hits, debug=True):
+def do_main(port, bench_hits, debug=True):
model = DummyModel()
app = Cumin(model)
- if broker:
- app.add_managed_broker(broker)
-
if debug or bench_hits:
app.enable_debug()
@@ -84,7 +79,6 @@
server.run()
def main():
- in_broker = args.get("broker")
in_port = int(args.get("port", 9090))
in_profile = "profile" in args
in_debug = "no-debug" not in args
@@ -121,7 +115,7 @@
stats.strip_dirs()
else:
try:
- do_main(in_port, in_broker, in_bench, in_debug)
+ do_main(in_port, in_bench, in_debug)
except KeyboardInterrupt:
pass
Modified: mgmt/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/cumin/python/cumin/__init__.py 2007-11-29 22:16:50 UTC (rev 1402)
+++ mgmt/cumin/python/cumin/__init__.py 2007-11-29 22:57:09 UTC (rev 1403)
@@ -40,6 +40,9 @@
self.model.sys = sys
+ for reg in BrokerRegistration.select():
+ self.mint.addManagedBroker(reg.host, reg.port)
+
self.cumin_page = CuminPage(self, "cumin.html")
self.set_default_page(self.cumin_page)
@@ -54,10 +57,6 @@
self.add_page(ClientXmlPage(self, "client.xml"))
self.add_page(ClientChartPage(self, "client.png"))
- def add_managed_broker(self, broker):
- host, port = broker.split(":")
- self.mint.addManagedBroker(host, int(port))
-
class CuminServer(WebServer):
def __init__(self, port=9090):
model = DummyModel()
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-29 22:16:50 UTC (rev 1402)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-29 22:57:09 UTC (rev 1403)
@@ -1,4 +1,4 @@
-import mint
+from mint import *
from wooly import *
from wooly.widgets import *
from random import random
@@ -83,7 +83,7 @@
def get_items(self, session, model):
start, end = self.paginator.get_bounds(session)
- return list(mint.Broker.select())[start:end]
+ return list(Broker.select())[start:end]
def do_process(self, session, model):
if self.submit.get(session):
@@ -109,7 +109,7 @@
class BrokerPaginator(Paginator):
def get_object(self, session, model):
- return list(mint.Broker.select()) #XXX ugh
+ return list(Broker.select()) #XXX ugh
class BrokerFrame(CuminFrame):
def __init__(self, app, name):
@@ -570,14 +570,25 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, model):
+ addrs = self.addrs.get(session)
names = self.names.get(session)
- addrs = self.addrs.get(session)
groups = self.groups.get(session)
- for name, addr, group in zip(names, addrs, groups):
- pass
- #broker = mint.Broker()
+ for i in range(0, len(addrs)):
+ addr = addrs[i]
+ if addr:
+ reg = BrokerRegistration()
+
+ elems = addr.split(":")
+
+ if len(elems) > 1:
+ reg.host, reg.port = elems[0], int(elems[1])
+ else:
+ reg.host = elems[0]
+
+ reg.name = names[i]
+
self.process_cancel(session, model)
class BrokerRemove(CuminConfirmForm):
Modified: mgmt/cumin/python/cumin/broker.strings
===================================================================
--- mgmt/cumin/python/cumin/broker.strings 2007-11-29 22:16:50 UTC (rev 1402)
+++ mgmt/cumin/python/cumin/broker.strings 2007-11-29 22:57:09 UTC (rev 1403)
@@ -19,7 +19,6 @@
</tr>
[BrokerSetForm.html]
-{page}
<form id="{id}" method="post" action="?">
<!-- <select onchange="document.getElementById('{id}.submit').submit()"> -->
17 years
rhmessaging commits: r1402 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-11-29 17:16:50 -0500 (Thu, 29 Nov 2007)
New Revision: 1402
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/StoreException.h
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
Log:
BZ403201: Solves problem of messages being prematurely deleted when an exception is thrown. This was achieved by extending the boost::intrusive_ptr<PersistableMessage> into data_token. Thus as long as the data_token exists, the message will not be deleted. Also solved the St9exception message mystery, and tidied up exception handling - part of the cause of this mystery.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -93,12 +93,12 @@
open(dequeueXidDb, txn.get(), "dequeue_xid.db", true);
open(prepareXidDb, txn.get(), "prepare_xid.db", false);
txn.commit();
- } catch (DbException& e) {
+ } catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error opening databases", e);
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
txn.abort();
- throw e;
+ throw;
}
ret = mode(async, force);
if (!ret) return false;
@@ -169,10 +169,10 @@
for (std::list<Db*>::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error closing databases", e);
- } catch (std::exception& e) {
- throw e;
+ } catch (const std::exception& e) {
+ throw;
}
}
@@ -190,7 +190,7 @@
try{
journal::jdir::delete_dir(getJrnlBaseDir(),true);
}
- catch ( journal::jexception& e) {
+ catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("truncate() failed: ") + e.what() );
}
}
@@ -207,7 +207,7 @@
try {
// init will create the deque's for the init...
jQueue->initialize();
- } catch (journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
}
@@ -216,7 +216,7 @@
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
}
}
@@ -246,7 +246,7 @@
if (!create(exchangeDb, exchangeIdSequence, exchange)) {
THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
}
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e);
}
}
@@ -334,7 +334,7 @@
txn.commit();
- } catch (DbException& e) {
+ } catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
}
@@ -546,7 +546,7 @@
assert( "Store Error: Unexpected msg state");
} // switch
} // while
- } catch (rhm::journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() +
": recoverMessages() failed: " + e.what());
}
@@ -737,9 +737,9 @@
store(NULL, &txn, key, msg, true);
msg->setPersistenceId(messageId);
txn.commit();
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
txn.abort();
- throw e;
+ throw;
}
}
}
@@ -754,7 +754,7 @@
try {
deleteIfUnused(txn.get(), key);
txn.commit();
- } catch (DbException& e) {
+ } catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error destroying message", e);
}
@@ -771,7 +771,7 @@
if (status != DB_BUFFER_SMALL) {
THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
}
- } catch (DbMemoryException& expected) {
+ } catch (const DbMemoryException& expected) {
//api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
}
return peek.get_size();
@@ -798,7 +798,7 @@
value.set_doff(offset);
value.set_dlen(size);
messageDb.put(0, &key, &value, DB_AUTO_COMMIT);
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error appending content", e);
}
} else {
@@ -836,9 +836,9 @@
data.assign(buffer, value.get_size());
delete [] buffer;
}
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error loading content", e);
- } catch (journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() +
": loadContent() failed: " + e.what());
}
@@ -856,7 +856,7 @@
if (jc){
jc->flush();
}
- }catch ( journal::jexception& e) {
+ }catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": flush() failed: " + e.what() );
}
}
@@ -908,9 +908,9 @@
}
if (!ctxt) txn->commit();
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
if (!ctxt) txn->abort();
- throw e;
+ throw;
}
}
@@ -936,7 +936,7 @@
//std::cout << "E" << std::flush;
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->ref();
- dtokp->setSourceMessage (message.get());
+ dtokp->setSourceMessage(message);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
bool written = false;
@@ -998,10 +998,10 @@
messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
}
}
- }catch ( journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
e.what());
- }catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -1060,12 +1060,12 @@
}
if (!ctxt) txn->commit();
- } catch (DbException& e) {
+ } catch (const DbException& e) {
if (!ctxt) txn->abort();
THROW_STORE_EXCEPTION_2("Error dequeing message", e);
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
if (!ctxt) txn->abort();
- throw e;
+ throw;
}
}
@@ -1076,7 +1076,7 @@
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->ref();
- ddtokp->setSourceMessage (msg.get());
+ ddtokp->setSourceMessage(msg);
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
@@ -1098,7 +1098,7 @@
} else {
dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (rhm::journal::jexception& e) {
+ } catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
switch (dres)
@@ -1234,10 +1234,10 @@
prepareXidDb.del(txn.get(), &key, 0);
txn.complete(commit);
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
std::cout << "Error completing xid " << txn.getXid() << ": " << e.what() << std::endl;
txn.abort();
- throw e;
+ throw;
}
}
@@ -1279,9 +1279,9 @@
prepareXidDb.put(txn->get(), &key, &value, 0);
txn->commit();
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
txn->abort();
- throw e;
+ throw;
}
}
@@ -1333,7 +1333,7 @@
} else if (status) {
THROW_STORE_EXCEPTION(DbEnv::strerror(status));
}
- } catch (DbException& e) {
+ } catch (const DbException& e) {
THROW_STORE_EXCEPTION(e.what());
}
}
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -99,7 +99,7 @@
}
catch (const jexception& e) {
if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ throw;
}
}
}
@@ -257,7 +257,7 @@
{
// Another thread has already called get_wr_events() and it is still busy, ignore
if (e.err_code() != jerrno::JERR__PTHREAD) {
- throw e;
+ throw;
}
}
}
Modified: store/trunk/cpp/lib/StoreException.h
===================================================================
--- store/trunk/cpp/lib/StoreException.h 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/StoreException.h 2007-11-29 22:16:50 UTC (rev 1402)
@@ -32,7 +32,7 @@
std::string text;
public:
StoreException(const std::string& _text) : text(_text) {}
- StoreException(const std::string& _text, DbException& cause) : text(_text + ": " + cause.what()) {}
+ StoreException(const std::string& _text, const DbException& cause) : text(_text + ": " + cause.what()) {}
virtual ~StoreException() throw() {}
virtual const char* what() const throw() { return text.c_str(); }
};
@@ -41,7 +41,7 @@
{
public:
StoreFullException(const std::string& _text) : StoreException(_text) {}
- StoreFullException(const std::string& _text, DbException& cause) : StoreException(_text, cause) {}
+ StoreFullException(const std::string& _text, const DbException& cause) : StoreException(_text, cause) {}
virtual ~StoreFullException() throw() {}
};
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-11-29 22:16:50 UTC (rev 1402)
@@ -75,7 +75,7 @@
{
jc->txn_abort(dtokp.get(), getXid());
}
- } catch (rhm::journal::jexception& e) {
+ } catch (const journal::jexception& e) {
//std::cout << "Error commit" << e << std::endl;
THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
}
@@ -115,7 +115,7 @@
allWritten = false;
jc->get_wr_events();
}
- }catch (rhm::journal::jexception& e) {
+ } catch (const journal::jexception& e) {
//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -49,7 +49,9 @@
}
}
+#include <boost/intrusive_ptr.hpp>
#include <pthread.h>
+#include <qpid/broker/PersistableMessage.h>
#include <sys/types.h>
#include <jrnl/jexception.hpp>
@@ -113,7 +115,7 @@
u_int64_t _rid; ///< RID of data set by enqueue operation
std::string _xid; ///< XID set by enqueue operation
u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
- qpid::broker::PersistableMessage* _sourceMsg; ///< Pointer back to source Message in Broker
+ boost::intrusive_ptr<qpid::broker::PersistableMessage> _sourceMsg; ///< Pointer back to source Message in Broker
public:
data_tok();
@@ -122,8 +124,10 @@
inline size_t refcnt(void) { return _ref_cnt;}
inline void ref(void) { _ref_cnt++; }
inline void unref(void) { _ref_cnt--; }
- inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
- inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
+ inline boost::intrusive_ptr<qpid::broker::PersistableMessage>& getSourceMessage()
+ { return _sourceMsg; }
+ inline void setSourceMessage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
+ { _sourceMsg = msg; }
inline const u_int64_t id() const { return _icnt; }
inline const write_state wstate() const { return _wstate; }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -209,7 +209,7 @@
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, NULL, 0, transient,
false);
}
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -225,7 +225,7 @@
{
res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, NULL, 0, transient, true);
}
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -243,7 +243,7 @@
res = _wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
transient, false);
}
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -259,7 +259,7 @@
{
res = _wmgr.enqueue(NULL, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true);
}
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -294,7 +294,7 @@
check_wstatus("dequeue_data");
pthread_mutex_lock(&_mutex);
try { res = _wmgr.dequeue(dtokp, NULL, 0); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -306,7 +306,7 @@
check_wstatus("dequeue_data");
pthread_mutex_lock(&_mutex);
try { res = _wmgr.dequeue(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -318,7 +318,7 @@
check_wstatus("txn_abort");
pthread_mutex_lock(&_mutex);
try { res = _wmgr.abort(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -330,7 +330,7 @@
check_wstatus("txn_commit");
pthread_mutex_lock(&_mutex);
try { res = _wmgr.commit(dtokp, xid.data(), xid.size()); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -357,7 +357,7 @@
return 0; // already locked, return immediately
}
try { res = _wmgr.get_events(pmgr::UNUSED); }
- catch (const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
return res;
}
@@ -393,7 +393,7 @@
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
pthread_mutex_lock(&_mutex);
try { _wmgr.flush(); }
- catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw e; }
+ catch(const std::exception& e) { pthread_mutex_unlock(&_mutex); throw; }
pthread_mutex_unlock(&_mutex);
}
@@ -461,8 +461,7 @@
}
catch (const jexception& e)
{
- if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY)
- throw e;
+ if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY) throw;
}
// Restore all read and write pointers and transactions
@@ -550,8 +549,7 @@
try { _emap.lock(dr.deq_rid()); }
catch(const jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
dr.get_xid(&xidp);
assert(xidp != NULL);
@@ -567,10 +565,9 @@
//std::cout << enq_fid;
rd._enq_cnt_list[enq_fid]--;
}
- catch (const jexception& e)
+ catch(const jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
}
}
@@ -601,8 +598,7 @@
}
catch(const jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
if (itr->_enq_flag)
rd._enq_cnt_list[itr->_fid]--;
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -290,11 +290,11 @@
//std::cout << ":ok" << std::flush;
is_enq = true;
}
- catch (jexception& e)
+ catch (const jexception& e)
{
// Block read for transactionally locked record (only when not recovering)
if (e.err_code() == jerrno::JERR_MAP_LOCKED && !_jc->is_read_only())
- throw e;
+ throw;
//if (e.err_code() == jerrno::JERR_MAP_LOCKED) std::cout << ":locked" << std::flush;
// (Recover mode only) Ok, not in emap - now search tmap, if present then read
@@ -333,10 +333,9 @@
if (_emap.is_locked(_hdr._rid) && !_jc->is_read_only())
return RHM_IORES_TXPENDING;
}
- catch (jexception e)
+ catch (const jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
#endif
if (dtokp->rid())
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -311,7 +311,10 @@
{
// If the enqueue is part of a pending txn, it will not yet be in emap
try { _emap.lock(dequeue_rid); }
- catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
std::string xid((char*)xid_ptr, xid_len);
_tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
}
@@ -452,7 +455,10 @@
if (!itr->_enq_flag)
_emap.unlock(itr->_drid);
}
- catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+ catch(const jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+ }
if (itr->_enq_flag)
_wrfc.decr_enqcnt(itr->_fid);
}
@@ -866,10 +872,9 @@
bool is_synced = true;
// Check for outstanding enqueues/dequeues
try { is_synced = _tmap.is_txn_synced(xid); }
- catch (jexception e)
+ catch (const jexception& e)
{
- if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
- throw e;
+ if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
}
if (!is_synced)
return false;
Modified: store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/tests/jrnl/unit_test_jdir.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -126,7 +126,7 @@
jdir::is_dir("/tmp/D");
BOOST_ERROR("jdir::is_dir() failed to throw jexeption for non-existent directory.");
}
- catch(const jexception e)
+ catch(const jexception& e)
{
BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_JDIR_STAT);
}
@@ -243,7 +243,7 @@
test_dir_3.clear_dir(false);
BOOST_ERROR("jdir::clear_dir(flase) failed to throw jexeption for non-existent directory.");
}
- catch(const jexception e)
+ catch(const jexception& e)
{
BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_JDIR_OPENDIR);
}
Modified: store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-29 21:46:43 UTC (rev 1401)
+++ store/trunk/cpp/tests/jrnl/unit_test_jinf.cpp 2007-11-29 22:16:50 UTC (rev 1402)
@@ -126,7 +126,7 @@
fn << jdir << "/" << base_filename << "." << JRNL_INFO_EXTENSION;
jinf ji(fn.str(), false);
try { ji.analyze(); }
- catch (jexception e)
+ catch (const jexception& e)
{
if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY)
BOOST_ERROR("Failed to throw expected exception jerrno::JERR_JINF_JDATEMPTY");
17 years
rhmessaging commits: r1401 - in mgmt: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-29 16:46:43 -0500 (Thu, 29 Nov 2007)
New Revision: 1401
Modified:
mgmt/cumin/python/cumin/broker.py
mgmt/cumin/python/cumin/brokercluster.py
mgmt/cumin/python/cumin/brokerprofile.py
mgmt/cumin/python/cumin/page.py
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.sql
Log:
Adds missing associations to management-side model objects.
Makes the UI more properly use them.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/cumin/python/cumin/broker.py 2007-11-29 21:46:43 UTC (rev 1401)
@@ -376,6 +376,9 @@
self.brokers = self.BrowserBrokers(app, "brokers")
self.add_child(self.brokers)
+ def get_title(self, session, model):
+ return "Brokers %s" % fmt_count(Broker.select().count())
+
class BrowserBrokers(BrokerSetForm):
def get_items(self, session, model):
return super(BrokerBrowser.BrowserBrokers, self).get_items \
Modified: mgmt/cumin/python/cumin/brokercluster.py
===================================================================
--- mgmt/cumin/python/cumin/brokercluster.py 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/cumin/python/cumin/brokercluster.py 2007-11-29 21:46:43 UTC (rev 1401)
@@ -19,7 +19,7 @@
return branch.marshal()
def get_title(self, session, model):
- return "Broker Clusters %s" % fmt_count(len(BrokerCluster.select()))
+ return "Broker Clusters %s" % fmt_count(BrokerCluster.select().count())
def get_items(self, session, model):
return list(BrokerCluster.select())
Modified: mgmt/cumin/python/cumin/brokerprofile.py
===================================================================
--- mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 21:46:43 UTC (rev 1401)
@@ -1,3 +1,4 @@
+from mint import *
from wooly import *
from wooly.widgets import *
@@ -5,6 +6,7 @@
from broker import *
from widgets import *
from parameters import *
+from formats import *
from util import *
strings = StringCatalog(__file__)
@@ -15,6 +17,9 @@
self.page().show_broker_profile_add(branch)
return branch.marshal()
+ def get_title(self, session, model):
+ return "Broker Profiles %s" % fmt_count(BrokerProfile.select().count())
+
def get_items(self, session, model):
return list(BrokerProfile.select())
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/cumin/python/cumin/page.py 2007-11-29 21:46:43 UTC (rev 1401)
@@ -164,10 +164,10 @@
def __init__(self, app, name):
super(MainView, self).__init__(app, name)
- self.add_tab(self.BrokerTab(app, "brokers"))
+ self.add_tab(BrokerBrowser(app, "brokers"))
self.add_tab(BrokerGroupSet(app, "groups"))
- self.add_tab(self.BrokerProfileTab(app, "profiles"))
- self.add_tab(self.BrokerClusterTab(app, "clusters"))
+ self.add_tab(BrokerProfileSet(app, "profiles"))
+ self.add_tab(BrokerClusterSet(app, "clusters"))
self.add_tab(self.TagTab(app, "tags"))
def show_broker_group(self, session, group):
@@ -178,20 +178,6 @@
def get_title(self, session, model):
return "Messaging"
- class BrokerTab(BrokerBrowser):
- def get_title(self, session, model):
- return "Brokers %s" % fmt_count(len(model.brokers()))
-
- class BrokerProfileTab(BrokerProfileSet):
- def get_title(self, session, model):
- return "Broker Profiles %s" % \
- fmt_count(len(model.get_broker_profiles()))
-
- class BrokerClusterTab(BrokerClusterSet):
- def get_title(self, session, model):
- return "Broker Clusters %s" % \
- fmt_count(len(model.get_broker_clusters()))
-
class TagTab(Widget):
def get_title(self, session, model):
return "Tags"
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/mint/python/mint/__init__.py 2007-11-29 21:46:43 UTC (rev 1401)
@@ -15,21 +15,18 @@
except TypeError:
pass
-
class BrokerRegistration(SQLObject):
name = StringCol(length=1000, default=None)
host = StringCol(length=1000, default=None)
port = SmallIntCol(default=None)
broker = ForeignKey("Broker", cascade="null", default=None)
-
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
- brokers = MultipleJoin("Broker")
+ brokers = RelatedJoin("Broker")
Broker.sqlmeta.addJoin(RelatedJoin("BrokerGroup", joinMethodName="groups"))
-
class BrokerCluster(SQLObject):
name = StringCol(length=1000, default=None)
brokers = MultipleJoin("Broker")
@@ -39,27 +36,26 @@
Broker.sqlmeta.addColumn(fk)
setattr(Broker, "cluster", fk)
-
class BrokerProfile(SQLObject):
name = StringCol(length=1000, default=None)
brokers = MultipleJoin("Broker")
-
-fk = ForeignKey("BrokerProfile", cascade="null",
- default=None, name="brokerProfile")
+ properties = MultipleJoin("ConfigProperty")
+
+fk = ForeignKey("BrokerProfile", cascade="null", default=None,
+ name="brokerProfile")
Broker.sqlmeta.addColumn(fk)
setattr(Broker, "profile", fk)
-
class ConfigProperty(SQLObject):
name = StringCol(length=1000, default=None)
value = StringCol(length=1000, default=None)
type = StringCol(length=1, default="s")
- profile = ForeignKey("BrokerProfile", cascade="null", default=None)
-BrokerProfile.sqlmeta.addJoin(MultipleJoin("ConfigProperty",
- joinMethodName="properties"))
+fk = ForeignKey("BrokerProfile", cascade="null", default=None,
+ name="brokerProfile")
+ConfigProperty.sqlmeta.addColumn(fk)
+setattr(ConfigProperty, "profile", fk)
-
class OriginalIdDict:
def __init__(self):
self.idMap = dict()
Modified: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql 2007-11-29 21:27:40 UTC (rev 1400)
+++ mgmt/mint/python/mint/schema.sql 2007-11-29 21:46:43 UTC (rev 1401)
@@ -26,7 +26,7 @@
name VARCHAR(1000),
value VARCHAR(1000),
type VARCHAR(1),
- profile_id INT
+ broker_profile_id INT
);
CREATE TABLE binding (
@@ -376,7 +376,7 @@
ALTER TABLE broker_registration ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
-ALTER TABLE config_property ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+ALTER TABLE config_property ADD CONSTRAINT broker_profile_id_exists FOREIGN KEY (broker_profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
ALTER TABLE binding ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
17 years
rhmessaging commits: r1400 - mgmt/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2007-11-29 16:27:40 -0500 (Thu, 29 Nov 2007)
New Revision: 1400
Modified:
mgmt/mint/python/mint/__init__.py
Log:
update in-memory map of original_ids upon reading an existing obj from the db
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-29 20:45:22 UTC (rev 1399)
+++ mgmt/mint/python/mint/__init__.py 2007-11-29 21:27:40 UTC (rev 1400)
@@ -15,36 +15,41 @@
except TypeError:
pass
+
class BrokerRegistration(SQLObject):
name = StringCol(length=1000, default=None)
host = StringCol(length=1000, default=None)
port = SmallIntCol(default=None)
broker = ForeignKey("Broker", cascade="null", default=None)
+
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
+ brokers = MultipleJoin("Broker")
Broker.sqlmeta.addJoin(RelatedJoin("BrokerGroup", joinMethodName="groups"))
-BrokerGroup.sqlmeta.addJoin(RelatedJoin("Broker", joinMethodName="brokers"))
+
class BrokerCluster(SQLObject):
name = StringCol(length=1000, default=None)
+ brokers = MultipleJoin("Broker")
fk = ForeignKey("BrokerCluster", cascade="null",
default=None, name="brokerCluster")
Broker.sqlmeta.addColumn(fk)
setattr(Broker, "cluster", fk)
-BrokerCluster.sqlmeta.addJoin(MultipleJoin("Broker", joinMethodName="brokers"))
+
class BrokerProfile(SQLObject):
name = StringCol(length=1000, default=None)
-
+ brokers = MultipleJoin("Broker")
+
fk = ForeignKey("BrokerProfile", cascade="null",
default=None, name="brokerProfile")
Broker.sqlmeta.addColumn(fk)
setattr(Broker, "profile", fk)
-BrokerProfile.sqlmeta.addJoin(MultipleJoin("Broker", joinMethodName="brokers"))
+
class ConfigProperty(SQLObject):
name = StringCol(length=1000, default=None)
value = StringCol(length=1000, default=None)
@@ -54,6 +59,7 @@
BrokerProfile.sqlmeta.addJoin(MultipleJoin("ConfigProperty",
joinMethodName="properties"))
+
class OriginalIdDict:
def __init__(self):
self.idMap = dict()
@@ -62,23 +68,33 @@
self.idMap[idOriginal] = obj
def getByOriginalId(self, objType, idOriginal, create=False, args={}):
+ obj = None
if (idOriginal in self.idMap):
#print "\n\n=============== %s %d found\n\n" % (objType.__name__, idOriginal)
- return self.idMap[idOriginal]
- elif (create):
- #print "\n\n=============== %s %d NOT found, creating\n\n" % (objType.__name__, idOriginal)
- obj = objType.__new__(objType)
- obj.__init__()
- self.idMap[idOriginal] = obj
- return obj
+ obj = self.idMap[idOriginal]
else:
- #print "\n\n=============== %s %d NOT found, NOT creating\n\n" % (objType.__name__, idOriginal)
- return None
+ try:
+ obj = eval("objType.selectBy(idOriginal=idOriginal)[:1][0]")
+ self.idMap[idOriginal] = obj
+ except:
+ if (create):
+ #print "\n\n=============== %s %d NOT found, creating\n\n" % (objType.__name__, idOriginal)
+ obj = objType.__new__(objType)
+ obj.__init__()
+ self.idMap[idOriginal] = obj
+ else:
+ print "\n\n=============== %s %d NOT found, NOT creating\n\n" % (objType.__name__, idOriginal)
+ #pass
+ else:
+ #print "\n\n=============== %s %d found AFTER QUERY\n\n" % (objType.__name__, idOriginal)
+ pass
+ return obj
def getByIndexAttrib(self, objType, indexAttrib, indexValue, create=False, args={}):
###FIX
return None
+
class ConnectedBroker:
def __init__(self, managedBroker):
self.managedBroker = managedBroker
@@ -116,11 +132,12 @@
def convertRefKey(self, k):
return k.replace("Ref", "")
- def findParentKey(self, d):
+ def findParentKeys(self, d):
+ keys = []
for key in d.keys():
if (key.endswith("Ref")):
- return key
- return ""
+ keys.append(key)
+ return keys
def configCallback(self, broker, objectName, list, timestamps):
self.log("\nCONFIG---------------------------------------------------")
@@ -131,17 +148,9 @@
d["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
d["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
self.log(d)
- ###FIX
- if (objectName == "broker"):
- # needs special handling until schema is sendind info about systems
- d.pop("systemRef")
- # d["system"] = connectedBroker.getByOriginalId(System, 0)
- d["system"] = System.selectBy(idOriginal=0)[:1][0]
- connectedBroker.objs.set(0, d["system"])
- else:
- parentKey = self.findParentKey(d)
- d[self.convertRefKey(parentKey)] = connectedBroker.getByOriginalId(schema.Vhost, d.pop(parentKey))
- ###FIX
+ for parentKey in self.findParentKeys(d):
+ convertedKey = self.convertRefKey(parentKey)
+ d[convertedKey] = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[convertedKey], d.pop(parentKey))
obj = connectedBroker.getByOriginalId(schema.schemaNameToClassMap[objectName], d["idOriginal"], create=True)
obj.set(**d)
self.log("END CONFIG---------------------------------------------------\n")
17 years
rhmessaging commits: r1399 - in mgmt: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-29 15:45:22 -0500 (Thu, 29 Nov 2007)
New Revision: 1399
Modified:
mgmt/cumin/python/cumin/brokercluster.py
mgmt/cumin/python/cumin/brokergroup.py
mgmt/cumin/python/cumin/brokergroup.strings
mgmt/cumin/python/cumin/brokerprofile.py
mgmt/cumin/python/cumin/parameters.py
mgmt/mint/python/mint/__init__.py
mgmt/mint/python/mint/schema.sql
Log:
Checkpoint commit of conversion to new management-side model objects.
Modified: mgmt/cumin/python/cumin/brokercluster.py
===================================================================
--- mgmt/cumin/python/cumin/brokercluster.py 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/cumin/python/cumin/brokercluster.py 2007-11-29 20:45:22 UTC (rev 1399)
@@ -1,3 +1,4 @@
+from mint import *
from wooly import *
from wooly.widgets import *
from random import random
@@ -18,11 +19,10 @@
return branch.marshal()
def get_title(self, session, model):
- return "Broker Clusters %s" \
- % fmt_count(len(model.get_broker_clusters()))
+ return "Broker Clusters %s" % fmt_count(len(BrokerCluster.select()))
def get_items(self, session, model):
- return sorted_by(model.get_broker_clusters())
+ return list(BrokerCluster.select())
def render_item_link(self, session, cluster):
branch = session.branch()
@@ -30,13 +30,13 @@
return fmt_olink(branch, cluster)
def render_item_config(self, session, cluster):
- count = len(cluster.broker_items())
+ count = len(cluster.brokers)
return "%i broker%s" % (count, ess(count))
def render_item_status(self, session, cluster):
writer = Writer()
- for broker in sorted_by(cluster.broker_items()):
+ for broker in sorted_by(cluster.brokers):
writer.write(fmt_ostatus(broker))
return writer.to_string()
@@ -95,11 +95,10 @@
class ClusterBrokerTab(BrokerSetForm):
def get_title(self, session, cluster):
- return "Brokers" #XXX %s" % fmt_count(len(cluster.broker_items()))
+ return "Brokers %s" % fmt_count(len(cluster.brokers))
def get_items(self, session, cluster):
- return sorted_by(self.app.model.sys.brokers, "id")
- #XXX return sorted_by(cluster.broker_items())
+ return cluster.brokers
class ClusterStatsTab(Widget):
def get_title(self, session, cluster):
@@ -113,11 +112,7 @@
self.add_child(self.cluster_name)
def process_cluster(self, session, cluster):
- cluster.lock()
- try:
- cluster.name = self.cluster_name.get(session)
- finally:
- cluster.unlock()
+ cluster.name = self.cluster_name.get(session)
branch = session.branch()
self.page().show_broker_cluster(branch, cluster).show_view(branch)
@@ -133,7 +128,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, model):
- cluster = BrokerCluster(model)
+ cluster = BrokerCluster()
self.process_cluster(session, cluster)
class BrokerClusterEdit(BrokerClusterForm, Frame):
@@ -161,7 +156,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, cluster):
- cluster.remove()
+ cluster.destroySelf()
branch = session.branch()
self.page().show_view(branch)
Modified: mgmt/cumin/python/cumin/brokergroup.py
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.py 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/cumin/python/cumin/brokergroup.py 2007-11-29 20:45:22 UTC (rev 1399)
@@ -1,3 +1,4 @@
+from mint import *
from wooly import *
from wooly.widgets import *
@@ -2,3 +3,2 @@
from broker import BrokerSetForm
-from model import *
from widgets import *
@@ -12,7 +12,7 @@
class BrokerGroupSet(ItemSet):
def get_title(self, session, model):
- return "Broker Groups %s" % fmt_count(len(model.get_broker_groups()))
+ return "Broker Groups %s" % fmt_count(BrokerGroup.select().count())
def render_group_add_href(self, session, model):
branch = session.branch()
@@ -20,21 +20,21 @@
return branch.marshal()
def get_items(self, session, model):
- return sorted_by(model.get_broker_groups())
-
+ return list(BrokerGroup.select())
+
def render_item_link(self, session, group):
branch = session.branch()
self.page().show_broker_group(branch, group).show_view(branch)
return fmt_olink(branch, group)
def render_item_config(self, session, group):
- count = len(group.broker_items())
+ count = len(group.brokers)
return "%i broker%s" % (count, ess(count))
def render_item_status(self, session, group):
writer = Writer()
- for broker in sorted_by(group.broker_items()):
+ for broker in sorted_by(group.brokers):
writer.write(fmt_ostatus(broker))
return writer.to_string()
@@ -86,16 +86,12 @@
def render_name(self, session, group):
return group.name
- def render_category(self, session, group):
- return group.get_type().name
-
class GroupBrokerTab(BrokerSetForm):
def get_title(self, session, group):
- return "Brokers %s" % fmt_count(len(group.broker_items()))
+ return "Brokers %s" % fmt_count(len(group.brokers))
def get_items(self, session, group):
- return sorted_by(self.app.model.sys.brokers, "port")
- #XXX return sorted_by(group.broker_items())
+ return group.brokers
class BrokerGroupForm(CuminForm):
def __init__(self, app, name):
@@ -105,12 +101,7 @@
self.add_child(self.group_name)
def process_group(self, session, group):
- group.lock()
- try:
- group.name = self.group_name.get(session)
- group.set_type(self.app.model.get_broker_group_types()[0])
- finally:
- group.unlock()
+ group.name = self.group_name.get(session)
branch = session.branch()
self.page().show_broker_group(branch, group).show_view(branch)
@@ -126,7 +117,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, model):
- group = BrokerGroup(model)
+ group = BrokerGroup()
self.process_group(session, group)
class BrokerGroupEdit(BrokerGroupForm, Frame):
@@ -164,7 +155,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, group):
- group.remove()
+ group.destroySelf()
branch = session.branch()
self.page().show_view(branch)
Modified: mgmt/cumin/python/cumin/brokergroup.strings
===================================================================
--- mgmt/cumin/python/cumin/brokergroup.strings 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/cumin/python/cumin/brokergroup.strings 2007-11-29 20:45:22 UTC (rev 1399)
@@ -35,7 +35,6 @@
<table class="props">
<tr><th>Name</th><td>{name}</td></tr>
- <tr><th>Category</th><td>{category}</td></tr>
<tr>
<th class="actions" colspan="2">
<h2>Act on This Group:</h2>
Modified: mgmt/cumin/python/cumin/brokerprofile.py
===================================================================
--- mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/cumin/python/cumin/brokerprofile.py 2007-11-29 20:45:22 UTC (rev 1399)
@@ -16,7 +16,7 @@
return branch.marshal()
def get_items(self, session, model):
- return sorted_by(model.get_broker_profiles())
+ return list(BrokerProfile.select())
def render_item_link(self, session, profile):
branch = session.branch()
@@ -64,7 +64,7 @@
class ProfileConfigTab(ConfigPropertySet):
def get_items(self, session, profile):
- return sorted_by(profile.config_property_items())
+ return sorted_by(profile.properties)
def get_title(self, session, profile):
return "Configuration"
@@ -74,11 +74,10 @@
super(BrokerProfileView.ProfileBrokerTab, self).__init__(app, name)
def get_title(self, session, profile):
- return "Brokers %s" % fmt_count(len(profile.broker_items()))
+ return "Brokers %s" % fmt_count(len(profile.brokers))
def get_items(self, session, profile):
- return sorted_by(self.app.model.sys.brokers, "id")
- #XXX return sorted_by(profile.broker_items())
+ return profile.brokers
def render_item_config_href(self, session, broker):
branch = session.branch()
@@ -91,7 +90,8 @@
diffs = 0
- for prop in broker.get_broker_profile().config_property_items():
+ for prop in broker.profile.properties:
+ # XXX need to add assoc for this
for iprop in broker.config_property_items():
if iprop.name == prop.name:
if iprop.value != prop.value:
@@ -107,11 +107,7 @@
self.add_child(self.profile_name)
def process_profile(self, session, profile):
- profile.lock()
- try:
- profile.name = self.profile_name.get(session)
- finally:
- profile.unlock()
+ profile.name = self.profile_name.get(session)
branch = session.branch()
self.page().show_broker_profile(branch, profile).show_view(branch)
@@ -127,7 +123,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, model):
- profile = BrokerProfile(model)
+ profile = BrokerProfile()
self.process_profile(session, profile)
class BrokerProfileEdit(BrokerProfileForm, Frame):
@@ -155,7 +151,7 @@
self.page().set_redirect_url(session, branch.marshal())
def process_submit(self, session, profile):
- profile.remove()
+ profile.destroySelf()
branch = session.branch()
self.page().show_view(branch)
Modified: mgmt/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/cumin/python/cumin/parameters.py 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/cumin/python/cumin/parameters.py 2007-11-29 20:45:22 UTC (rev 1399)
@@ -3,14 +3,14 @@
class BrokerClusterParameter(Parameter):
def do_unmarshal(self, string):
- return self.app.model.get_broker_cluster(int(string))
+ return BrokerCluster.get(int(string))
def do_marshal(self, cluster):
return str(cluster.id)
class BrokerGroupParameter(Parameter):
def do_unmarshal(self, string):
- return self.app.model.get_broker_group(int(string))
+ return BrokerGroup.get(int(string))
def do_marshal(self, group):
return str(group.id)
@@ -24,7 +24,7 @@
class BrokerProfileParameter(Parameter):
def do_unmarshal(self, string):
- return self.app.model.get_broker_profile(int(string))
+ return BrokerProfile.get(int(string))
def do_marshal(self, profile):
return str(profile.id)
@@ -57,16 +57,9 @@
def do_marshal(self, queue):
return str(queue.id)
-class RealmParameter(Parameter):
- def do_unmarshal(self, string):
- return self.app.model.get_realm(int(string))
-
- def do_marshal(self, queue):
- return str(realm.id)
-
class VirtualHostParameter(Parameter):
def do_unmarshal(self, string):
- return self.app.model.get_virtual_host(int(string))
+ return Vhost.get(int(string))
def do_marshal(self, vhost):
return str(vhost.id)
Modified: mgmt/mint/python/mint/__init__.py
===================================================================
--- mgmt/mint/python/mint/__init__.py 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/mint/python/mint/__init__.py 2007-11-29 20:45:22 UTC (rev 1399)
@@ -4,14 +4,14 @@
from mint import schema
-this_mod = __import__(__name__)
+this_module = __import__(__name__)
for item in dir(schema):
cls = getattr(schema, item)
try:
if issubclass(cls, SQLObject) and cls is not SQLObject:
- setattr(this_mod, item, cls)
+ setattr(this_module, item, cls)
except TypeError:
pass
@@ -24,23 +24,36 @@
class BrokerGroup(SQLObject):
name = StringCol(length=1000, default=None)
-BrokerGroup.sqlmeta.addJoin(MultipleJoin("Broker", joinMethodName="brokers"))
+Broker.sqlmeta.addJoin(RelatedJoin("BrokerGroup", joinMethodName="groups"))
+BrokerGroup.sqlmeta.addJoin(RelatedJoin("Broker", joinMethodName="brokers"))
class BrokerCluster(SQLObject):
name = StringCol(length=1000, default=None)
+fk = ForeignKey("BrokerCluster", cascade="null",
+ default=None, name="brokerCluster")
+Broker.sqlmeta.addColumn(fk)
+setattr(Broker, "cluster", fk)
BrokerCluster.sqlmeta.addJoin(MultipleJoin("Broker", joinMethodName="brokers"))
class BrokerProfile(SQLObject):
name = StringCol(length=1000, default=None)
+fk = ForeignKey("BrokerProfile", cascade="null",
+ default=None, name="brokerProfile")
+Broker.sqlmeta.addColumn(fk)
+setattr(Broker, "profile", fk)
BrokerProfile.sqlmeta.addJoin(MultipleJoin("Broker", joinMethodName="brokers"))
class ConfigProperty(SQLObject):
name = StringCol(length=1000, default=None)
value = StringCol(length=1000, default=None)
type = StringCol(length=1, default="s")
+ profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+BrokerProfile.sqlmeta.addJoin(MultipleJoin("ConfigProperty",
+ joinMethodName="properties"))
+
class OriginalIdDict:
def __init__(self):
self.idMap = dict()
Modified: mgmt/mint/python/mint/schema.sql
===================================================================
--- mgmt/mint/python/mint/schema.sql 2007-11-29 19:18:28 UTC (rev 1398)
+++ mgmt/mint/python/mint/schema.sql 2007-11-29 20:45:22 UTC (rev 1399)
@@ -25,7 +25,8 @@
id SERIAL PRIMARY KEY,
name VARCHAR(1000),
value VARCHAR(1000),
- type VARCHAR(1)
+ type VARCHAR(1),
+ profile_id INT
);
CREATE TABLE binding (
@@ -71,8 +72,14 @@
initial_disk_page_size INT,
initial_pages_per_queue INT,
cluster_name VARCHAR(1000),
- version VARCHAR(1000)
+ version VARCHAR(1000),
+ broker_cluster_id INT,
+ broker_profile_id INT
);
+CREATE TABLE broker_broker_group (
+broker_id INT NOT NULL,
+broker_group_id INT NOT NULL
+);
CREATE TABLE broker_stats (
id SERIAL PRIMARY KEY,
@@ -369,6 +376,8 @@
ALTER TABLE broker_registration ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
+ALTER TABLE config_property ADD CONSTRAINT profile_id_exists FOREIGN KEY (profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+
ALTER TABLE binding ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
ALTER TABLE binding ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id) REFERENCES binding_stats (id) ON DELETE SET NULL;
@@ -385,6 +394,10 @@
ALTER TABLE broker ADD CONSTRAINT system_id_exists FOREIGN KEY (system_id) REFERENCES system (id) ON DELETE SET NULL;
+ALTER TABLE broker ADD CONSTRAINT broker_cluster_id_exists FOREIGN KEY (broker_cluster_id) REFERENCES broker_cluster (id) ON DELETE SET NULL;
+
+ALTER TABLE broker ADD CONSTRAINT broker_profile_id_exists FOREIGN KEY (broker_profile_id) REFERENCES broker_profile (id) ON DELETE SET NULL;
+
ALTER TABLE broker_stats ADD CONSTRAINT broker_id_exists FOREIGN KEY (broker_id) REFERENCES broker (id) ON DELETE SET NULL;
ALTER TABLE client ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id) REFERENCES client_stats (id) ON DELETE SET NULL;
17 years
rhmessaging commits: r1398 - mgmt/bin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-11-29 14:18:28 -0500 (Thu, 29 Nov 2007)
New Revision: 1398
Modified:
mgmt/bin/devel-reload-database
Log:
Also load some demo data.
Modified: mgmt/bin/devel-reload-database
===================================================================
--- mgmt/bin/devel-reload-database 2007-11-29 17:20:49 UTC (rev 1397)
+++ mgmt/bin/devel-reload-database 2007-11-29 19:18:28 UTC (rev 1398)
@@ -7,3 +7,5 @@
psql -d "$1" -c "drop schema public cascade; create schema public"
psql -d "$1" -f "$DEVEL_HOME"/mint/python/mint/schema.sql
+
+python "$DEVEL_HOME"/cumin/python/cumin/demo.py postgresql://localhost/"$1"
17 years