rhmessaging commits: r924 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-14 14:45:18 -0400 (Fri, 14 Sep 2007)
New Revision: 924
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
more async journal integration - default still set to sync
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-14 18:21:12 UTC (rev 923)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-14 18:45:18 UTC (rev 924)
@@ -44,6 +44,9 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
+// cct delete this !!!!
+// bool hack = false;
+
BdbMessageStore::BdbMessageStore(const char* envpath) : env(0),
queueDb(&env, 0),
exchangeDb(&env, 0),
@@ -109,7 +112,18 @@
(*i)->truncate(txn, &count, 0);
}
- txn->commit(0);
+ txn->commit(0);
+
+ if (usingJrnl())
+ {
+ try{
+ journal::jdir::delete_dir(getJrnlBaseDir(),true);
+ }
+ catch ( journal::jexception& e) {
+ std::string str;
+ THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+ }
+ }
}
void BdbMessageStore::create(PersistableQueue& queue)
@@ -149,12 +163,11 @@
{
destroy(queueDb, queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
- journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
- if (jQueue)
+ if (eqs)
{
+ journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
jQueue->delete_jrnl();
- delete(jQueue);
- queue.setExternalQueueStore(NULL);
+ queue.setExternalQueueStore(NULL); // will delete the journal if exists
}
}
@@ -292,6 +305,25 @@
RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
+
+ const char* queueName = queue->getName().c_str();
+
+ if (usingJrnl())
+ {
+
+ journal::jcntl* jQueue = new journal::jcntl(queueName, getJrnlDir(queueName), string("JournalData"));
+ queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
+
+ try
+ {
+ jQueue->recover();
+ } catch (journal::jexception& e) {
+ std::string s;
+ THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
+ }
+ }
+
+
index[key.id] = queue;
maxQueueId = max(key.id, maxQueueId);
}
@@ -500,7 +532,7 @@
try {
Dbt key (&messageId, sizeof(messageId));
messageId = messageIdSequence.next();
- store(NULL, txn.get(), key, msg);
+ store(NULL, txn.get(), key, msg, true);
msg.setPersistenceId(messageId);
txn.commit();
} catch (std::exception& e) {
@@ -618,17 +650,19 @@
}
try {
+
+ bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
- store(&queue, txn->get(), key, msg);
msg.setPersistenceId(messageId);
- }
- if (!usingJrnl()) {
- msg.enqueueComplete(); // set enqueued for ack
- }
-
- if (!usingJrnl())
+ newId = true;
+ }
+ store(&queue, txn->get(), key, msg, newId);
+
+ if (/*!hack || */ !usingJrnl()){
+ msg.enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
+ }
// cct if using Journal do we need to wait for IO to complete before calling thus???
// set enqueue comple on callback msg.enqueueComplete();
@@ -643,7 +677,10 @@
}
}
-void BdbMessageStore::store(const PersistableQueue* queue, DbTxn* txn, Dbt& messageId, PersistableMessage& message)
+void BdbMessageStore::store(const PersistableQueue* queue,
+ DbTxn* txn, Dbt& messageId,
+ PersistableMessage& message,
+ bool newId)
{
u_int32_t headerSize = message.encodedHeaderSize();
u_int64_t size = message.encodedSize() + sizeof(u_int32_t);
@@ -652,11 +689,13 @@
buffer.putLong(headerSize);
message.encode(buffer);
//buffer.flip();
-
+
+
try {
- if (queue && usingJrnl()){
-
+ if (/*hack &&*/ queue && usingJrnl()){
+
+ if (queue){
// cct TODO -- delete this in the callback...
journal::data_tok* dtokp = new journal::data_tok();
dtokp->setSourceMessage (&message);
@@ -667,11 +706,14 @@
while (!written)
{
journal::jcntl* jc = static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
+ char text[4];
+ strcpy(text,"123");
+
+ rhm::journal::iores eres = jc->enqueue_data(&text, 3, dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
- if (dtokp->wstate() >= rhm::journal::data_tok::ENQ_SUBM)
+ if (dtokp->wstate() >= rhm::journal::data_tok::ENQ_SUBM)
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
@@ -688,13 +730,20 @@
assert( "Unexpected msg state");
}
}
+ }
+
} else {
/// cct message db
- Dbt data(buff,size);
- messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ if (newId){ // only store in Bd if first time message is stored
+ Dbt data(buff,size);
+ messageDb.put(txn, &messageId, &data, DB_NOOVERWRITE);
+ }
}
-
- } catch (DbException& e) {
+ }catch ( journal::jexception& e) {
+ std::string str;
+ std::cout << "----" << e.to_string(str) << std::endl;
+ THROW_STORE_EXCEPTION("Truncate clean up failed: " +e.to_string(str) );
+ }catch (DbException& e) {
THROW_STORE_EXCEPTION_2("Error storing message", e);
}
}
@@ -930,17 +979,32 @@
}
}
+string BdbMessageStore::getJrnlBaseDir()
+{
+ std::stringstream dir;
+ dir << "/var/rhm/" ;
+ return dir.str();
+}
+
string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
+ return getJrnlDir(queue.getName().c_str());
+}
+
+string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+{
std::stringstream dir;
- dir << "/var/rhm/" ;
+ dir << getJrnlBaseDir();
dir << std::setw(4);
dir << std::setfill('0');
- dir << (atol(queue.getName().c_str())%20);
- dir << "/" << queue.getName() << "/";
+// const char* str = queueName; //queue.getName().c_str();
+ u_int32_t count = 0;
+ for (u_int32_t i=0; i < strlen(queueName); i++)
+ count += queueName[i];
+
+ dir << (count%20);
+ dir << "/" << queueName << "/";
return dir.str();
}
-
-
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-09-14 18:21:12 UTC (rev 923)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-09-14 18:45:18 UTC (rev 924)
@@ -85,7 +85,10 @@
void readXids(Db& db, std::set<string>& xids);
void readLockedMappings(Db& db, txn_lock_map& mappings);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
- void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn, Dbt& messageId, qpid::broker::PersistableMessage& message);
+ void store(const qpid::broker::PersistableQueue* queue, DbTxn* txn,
+ Dbt& messageId,
+ qpid::broker::PersistableMessage& message,
+ bool newId);
void enqueue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId);
bool deleteIfUnused(Cursor& cursor, DbTxn* txn, Dbt& messageId);
@@ -104,7 +107,9 @@
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
+ string getJrnlDir(const char* queueName);
inline bool usingJrnl() {return false;} // make configurable
+ string getJrnlBaseDir();
public:
BdbMessageStore(const char* envpath = 0);
16 years, 9 months
rhmessaging commits: r923 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-14 14:21:12 -0400 (Fri, 14 Sep 2007)
New Revision: 923
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
Log:
Added simple timeout to blocking jcntl::aio_cmpl_wait(); some obsolete commented-out code removed.
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -87,7 +87,8 @@
case DEQ:
return "DEQ";
}
- return "<unknown>";
+ // Not using default: forces compiler to ensure all cases are covered.
+ return "<wstate unknown>";
}
const char*
@@ -110,7 +111,8 @@
case READ:
return "READ";
}
- return "<unknown>";
+ // Not using default: forces compiler to ensure all cases are covered.
+ return "<rstate unknown>";
}
void
@@ -126,15 +128,6 @@
_rstate = rstate;
}
-const u_int64_t
-data_tok::rid() const throw (jexception)
-{
-// if (_wstate == NONE)
-// throw new jexception(jerrno::JERR_DTOK_RIDNOTSET,
-// "Instance in write state NONE; rid not known.", "data_tok","rid");
- return _rid;
-}
-
void
data_tok::reset()
{
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -126,14 +126,12 @@
inline void incr_dblocks_written(u_int32_t dblks_written)
{ _dblks_written += dblks_written; }
inline void set_dblocks_written(u_int32_t dblks_written) { _dblks_written = dblks_written; }
-// inline void reset_dblks_proc() { _dblks_written = 0; }
inline const u_int32_t dblocks_read() const { return _dblks_read; }
inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read; }
inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }
-// inline void reset_dblks_read() { _dblks_read = 0; }
- const u_int64_t rid() const throw (jexception);
+ inline const u_int64_t rid() const { return _rid; }
inline void set_rid(const u_int64_t rid) { _rid = rid; }
inline const u_int64_t dequeue_rid() const throw (jexception) {return _dequeue_rid; }
inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -201,13 +201,20 @@
of.close();
}
-#define AIO_CMPL_SLEEP 10000 // 10 ms
+#define AIO_CMPL_SLEEP 10000 // 10 ms
+#define MAX_AIO_CMPL_SLEEPS 1000 // Total: 10 sec
+
void
jcntl::aio_cmpl_wait() throw (jexception)
{
- while (_wmgr.get_aio_evt_rem())
+ u_int32_t cnt = 0;
+ u_int32_t rem = _wmgr.get_aio_evt_rem();
+ while (rem)
{
get_wr_events();
+ rem = _wmgr.get_aio_evt_rem();
+ if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+ throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
usleep(AIO_CMPL_SLEEP);
}
}
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -53,6 +53,7 @@
// class jcntl
const u_int32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
+const u_int32_t jerrno::JERR_JCNTL_AIOCMPLWAIT = 0x0201;
// class jdir
const u_int32_t jerrno::JERR_JDIR_MKDIR = 0x0300;
@@ -116,6 +117,8 @@
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = std::string("JERR_JCNTL_STOPPED: Operation on stopped journal.");
+ _err_map[JERR_JCNTL_AIOCMPLWAIT] = std::string("JERR_JCNTL_AIOCMPLWAIT: "
+ "Timeout waiting for AIOs to complete.");
// class jdir
_err_map[JERR_JDIR_MKDIR] = std::string("JERR_JDIR_MKDIR: Directory creation failed.");
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -70,6 +70,7 @@
// class jcntl
static const u_int32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal
+ static const u_int32_t JERR_JCNTL_AIOCMPLWAIT; ///< Timeout waiting for AIOs to complete
// class jdir
static const u_int32_t JERR_JDIR_MKDIR; ///< Directory creation failed
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -316,40 +316,6 @@
return _wr_cmpl_cnt_dblks;
}
-// const bool
-// nlfh::is_rd_compl() const throw (jexception)
-// {
-// return rd_remaining_dblks() == 0 &&
-// (JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) - _wr_cmpl_cnt_dblks == 0;
-// }
-//
-// const u_int32_t
-// nlfh::rd_remaining_dblks() const throw (jexception)
-// {
-// if (_rd_subm_cnt_dblks > _wr_subm_cnt_dblks)
-// {
-// std::stringstream ss;
-// ss << "_rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks;
-// ss << " _wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
-// throw jexception(jerrno::JERR_NFLH_RDOFFSOVFL, ss.str(), "nlfh", "rd_remaining_dblks");
-// }
-// return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks;
-// }
-//
-// const u_int32_t
-// nlfh::wr_remaining_dblks() const throw (jexception)
-// {
-// assert(_wr_subm_cnt_dblks <= JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1));
-// if (_wr_subm_cnt_dblks > JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) // Allow for file header
-// {
-// std::stringstream ss;
-// ss << "_wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
-// ss << " fsize=" << JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1) << " dblks";
-// throw jexception(jerrno::JERR_NLFH_FILEOFFSOVFL, ss.str(), "nlfh", "wr_remaining_dblks");
-// }
-// return (JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) - _wr_subm_cnt_dblks;
-// }
-
const bool
nlfh::will_fit(const size_t rec_size) const
{
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -129,12 +129,6 @@
{ return _wr_subm_cnt_dblks - _wr_cmpl_cnt_dblks; }
inline const bool wr_file_rotate() const { return is_wr_full(); }
-// const u_int32_t rd_remaining_dblks() const throw (jexception);
-// const bool is_rd_compl() const throw (jexception);
-// const u_int32_t wr_remaining_dblks() const throw (jexception);
-// inline const bool is_wr_compl() const throw (jexception)
-// { return wr_remaining_dblks() == 0; }
-
const bool will_fit(const size_t rec_size) const;
protected:
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -258,9 +258,8 @@
return "AIO_PENDING";
case AIO_COMPLETE:
return "AIO_COMPLETE";
- default:
- return "<unknown state>";
}
+ return "<page_state unknown>";
}
} // namespace journal
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-14 17:33:14 UTC (rev 922)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-14 18:21:12 UTC (rev 923)
@@ -74,7 +74,6 @@
if (_fh_index == _nfiles)
_fh_index = 0;
_curr_fh = _fh_arr[_fh_index];
-//std::cout << "wrfc::rotate() RESET fid=" << _fh_index << std::flush;
return reset(); //Checks if file is still in use (ie not fully dequeued yet)
}
16 years, 9 months
rhmessaging commits: r922 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-14 13:33:14 -0400 (Fri, 14 Sep 2007)
New Revision: 922
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
Log:
Added a blocking ability to jcntl::stop() which will wait for all remianing write AIO events to complete before returning.
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-14 17:10:49 UTC (rev 921)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-14 17:33:14 UTC (rev 922)
@@ -120,12 +120,8 @@
void
jcntl::delete_jrnl() throw (jexception)
{
- stop();
-
- // wait for AIO or issue cancel...
-
+ stop(true); // wait for AIO to complete
_jdir.delete_dir();
-
}
@@ -164,11 +160,13 @@
}
void
-jcntl::stop() throw (jexception)
+jcntl::stop(bool block_for_aio_cmpl) throw (jexception)
{
check_status("stop");
_stop_flag = true;
flush();
+ if (block_for_aio_cmpl)
+ aio_cmpl_wait();
}
// Private functions
@@ -203,7 +201,18 @@
of.close();
}
+#define AIO_CMPL_SLEEP 10000 // 10 ms
void
+jcntl::aio_cmpl_wait() throw (jexception)
+{
+ while (_wmgr.get_aio_evt_rem())
+ {
+ get_wr_events();
+ usleep(AIO_CMPL_SLEEP);
+ }
+}
+
+void
jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
{
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-14 17:10:49 UTC (rev 921)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-14 17:33:14 UTC (rev 922)
@@ -257,8 +257,11 @@
*
* <b>Note:</b> The jouranl cannot be restarted without either initailizing it or restoring
* it.
+ *
+ * \param block_for_aio_cmpl If true, will block the thread while waiting for all outstanding
+ * AIO operations to complete.
*/
- void stop() throw (jexception);
+ void stop(bool block_for_aio_cmpl = false) throw (jexception);
/**
* \brief Force a flush of the write page cache, creating a single AIO write operation.
@@ -312,6 +315,11 @@
*/
void write_infofile() const throw (jexception);
+ /**
+ * \brief Call that blocks while waiting for all outstanding AIOs to complete
+ */
+ void aio_cmpl_wait() throw (jexception);
+
/**
* Intenal callback write
*/
16 years, 9 months
rhmessaging commits: r921 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-09-14 13:10:49 -0400 (Fri, 14 Sep 2007)
New Revision: 921
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/jrnl/data_rec.cpp
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/lib/jrnl/nlfh.cpp
store/trunk/cpp/lib/jrnl/nlfh.hpp
store/trunk/cpp/lib/jrnl/pmgr.cpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/lib/jrnl/rmgr.hpp
store/trunk/cpp/lib/jrnl/rrfc.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
store/trunk/cpp/lib/jrnl/wrfc.cpp
store/trunk/cpp/lib/jrnl/wrfc.hpp
store/trunk/cpp/tests/jrnl/JournalTest.cpp
store/trunk/cpp/tests/jrnl/Makefile.am
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/jtest.cpp
store/trunk/cpp/tests/jrnl/jtest.hpp
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_consumer.hpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.hpp
store/trunk/cpp/tests/jrnl/rtest
store/trunk/cpp/tests/jrnl/rwtests.csv
store/trunk/cpp/tests/jrnl/tests.ods
Log:
Bug fixes to the journal read manager logic; there is still a good deal of commented out debug comments in the rmgr files.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/Makefile.am 2007-09-14 17:10:49 UTC (rev 921)
@@ -1,5 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS) \
- -DRHM_CLEAN -DRHM_WRONLY -DRHM_TESTVALS
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(APR_CXXFLAGS) $(QPID_CXXFLAGS)
lib_LTLIBRARIES = libbdbstore.la
Modified: store/trunk/cpp/lib/jrnl/data_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_rec.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/data_rec.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -133,6 +133,8 @@
{
if (size_dblks(rec_size()) - rec_offs_dblks > max_size_dblks) // Further split required
{
+ // TODO: The header will never be split as it will always be less than one dblk.
+ // Remove this check...
size_t wsize = sizeof(_enq_hdr) > rec_offs ? sizeof(_enq_hdr) - rec_offs : 0;
size_t wsize2 = wsize;
if (wsize)
@@ -177,6 +179,8 @@
}
else // No further split required
{
+ // TODO: The header will never be split as it will always be less than one dblk.
+ // Remove this check...
size_t wsize = sizeof(_enq_hdr) > rec_offs ? sizeof(_enq_hdr) - rec_offs : 0;
if (wsize)
{
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -45,11 +45,13 @@
u_int64_t data_tok::_cnt = 0;
data_tok::data_tok():
+ // FIXME: Make this thread safe!
_icnt(_cnt++),
_wstate(NONE),
_rstate(UNREAD),
_dsize(0),
- _dblks_proc(0),
+ _dblks_written(0),
+ _dblks_read(0),
_rid(0),
_sourceMsg(NULL)
{}
@@ -103,6 +105,8 @@
return "NONE";
case READ_PART:
return "READ_PART";
+ case SKIP_PART:
+ return "SKIP_PART";
case READ:
return "READ";
}
@@ -125,9 +129,9 @@
const u_int64_t
data_tok::rid() const throw (jexception)
{
-/* if (_wstate == NONE)
- throw new jexception(jerrno::JERR_DTOK_RIDNOTSET,
- "Instance in write state NONE; rid not known.", "data_tok","rid"); */
+// if (_wstate == NONE)
+// throw new jexception(jerrno::JERR_DTOK_RIDNOTSET,
+// "Instance in write state NONE; rid not known.", "data_tok","rid");
return _rid;
}
@@ -137,7 +141,8 @@
_wstate = NONE;
_rstate = UNREAD;
_dsize = 0;
- _dblks_proc = 0;
+ _dblks_written = 0;
+ _dblks_read = 0;
_rid = 0;
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -82,26 +82,28 @@
{
UNREAD, ///< Data block not read
READ_PART, ///< Data block is part-read; waiting for page buffer to fill
+ SKIP_PART, ///< Prev. dequeued dblock is part-skipped; waiting for page buffer to fill
READ ///< Data block is fully read
};
private:
static u_int64_t _cnt;
- u_int64_t _icnt;
+ u_int64_t _icnt;
write_state _wstate; ///< Enqueued / dequeued state of data
read_state _rstate; ///< Read state of data
size_t _dsize; ///< Data size in bytes
- u_int32_t _dblks_proc; ///< Data blocks read/written
+ u_int32_t _dblks_written; ///< Data blocks read/written
+ u_int32_t _dblks_read; ///< Data blocks read/written
u_int64_t _rid; ///< RID of data set by enqueue operation
- u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
- qpid::broker::PersistableMessage* _sourceMsg; ///< Pointer back to source Message in Broker
+ u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
+ qpid::broker::PersistableMessage* _sourceMsg; ///< Pointer back to source Message in Broker
public:
data_tok();
~data_tok();
- inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
- inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
+ inline qpid::broker::PersistableMessage* getSourceMessage(){return _sourceMsg;}
+ inline void setSourceMessage(qpid::broker::PersistableMessage* msg) {_sourceMsg = msg;}
inline const u_int64_t id() const { return _icnt; }
inline const write_state wstate() const {return _wstate; }
@@ -119,10 +121,18 @@
void set_rstate(const read_state rstate);
inline const size_t dsize() const { return _dsize; }
inline void set_dsize(size_t dsize) { _dsize = dsize; }
- inline const u_int32_t dblocks_proc() const { return _dblks_proc; }
- inline void incr_dblocks_proc(u_int32_t dblks_proc) { _dblks_proc += dblks_proc; }
- inline void set_dblocks_proc(u_int32_t dblks_proc) { _dblks_proc = dblks_proc; }
- inline void reset_dblks_proc() { _dblks_proc = 0; }
+
+ inline const u_int32_t dblocks_written() const { return _dblks_written; }
+ inline void incr_dblocks_written(u_int32_t dblks_written)
+ { _dblks_written += dblks_written; }
+ inline void set_dblocks_written(u_int32_t dblks_written) { _dblks_written = dblks_written; }
+// inline void reset_dblks_proc() { _dblks_written = 0; }
+
+ inline const u_int32_t dblocks_read() const { return _dblks_read; }
+ inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read; }
+ inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }
+// inline void reset_dblks_read() { _dblks_read = 0; }
+
const u_int64_t rid() const throw (jexception);
inline void set_rid(const u_int64_t rid) { _rid = rid; }
inline const u_int64_t dequeue_rid() const throw (jexception) {return _dequeue_rid; }
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -51,7 +51,7 @@
{}
void
-enq_map::insert_fid(u_int64_t rid, u_int16_t fid) throw (jexception)
+enq_map::insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception)
{
_ret = _map.insert(std::pair<u_int64_t, u_int16_t>(rid, fid));
if (_ret.second == false)
@@ -63,8 +63,8 @@
}
}
-u_int16_t
-enq_map::get_remove_fid(u_int64_t rid) throw (jexception)
+const u_int16_t
+enq_map::get_fid(const u_int64_t rid) throw (jexception)
{
_it = _map.find(rid);
if (_it == _map.end())
@@ -74,6 +74,20 @@
ss << "rid=0x" << std::setw(16) << rid;
throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
}
+ return _it->second;
+}
+
+const u_int16_t
+enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
+{
+ _it = _map.find(rid);
+ if (_it == _map.end())
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "rid=0x" << std::setw(16) << rid;
+ throw jexception(jerrno::JERR_EMAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
+ }
u_int16_t fid = _it->second;
_map.erase(_it);
return fid;
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -65,8 +65,10 @@
enq_map();
~enq_map();
- void insert_fid(u_int64_t rid, u_int16_t fid) throw (jexception);
- u_int16_t get_remove_fid(u_int64_t rid) throw (jexception);
+ void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
+ const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
+ const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
+ inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
};
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -106,8 +106,11 @@
void
-jdir::clear_dir(const std::string& dirname, const std::string& base_filename,
- const bool create_flag) throw (jexception)
+jdir::clear_dir(const std::string& dirname, const std::string&
+#ifndef RHM_JOWRITE
+ base_filename
+#endif
+ , const bool create_flag) throw (jexception)
{
DIR* dir = ::opendir(dirname.c_str());
if (!dir)
@@ -154,14 +157,14 @@
}
}
}
-/* if (errno)
- {
- std::stringstream ss;
- ss << "dir=\"" << dirname << "\" errno=" << errno;
- ::closedir(dir); // Try to close, it makes no sense to trap errors here...
- throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
- }
- */#endif
+// if (errno)
+// {
+// std::stringstream ss;
+// ss << "dir=\"" << dirname << "\" errno=" << errno;
+// ::closedir(dir); // Try to close, it makes no sense to trap errors here...
+// throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
+// }
+#endif
if (::closedir(dir))
{
std::stringstream ss;
@@ -238,14 +241,14 @@
}
}
}
-/* if (errno)
- {
- std::stringstream ss;
- ss << "dir=\"" << dirname << "\" errno=" << errno;
- ::closedir(dir); // Try to close, it makes no sense to trap errors here...
- throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "delete_dir");
- }
- */ }
+// if (errno)
+// {
+// std::stringstream ss;
+// ss << "dir=\"" << dirname << "\" errno=" << errno;
+// ::closedir(dir); // Try to close, it makes no sense to trap errors here...
+// throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "delete_dir");
+// }
+ }
// Now dir is empty, close and delete it
if (::closedir(dir))
{
@@ -295,17 +298,17 @@
}
}
}
-/* if (errno)
+// if (errno)
+// {
+// std::stringstream ss;
+// ss << "dir=\"" << dirname << "\" errno=" << errno;
+// ::closedir(dir); // Try to close, it makes no sense to trap errors here...
+// throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
+// }
+ if (::closedir(dir))
{
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
- ::closedir(dir); // Try to close, it makes no sense to trap errors here...
- throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
- }
- */ if (::closedir(dir))
- {
- std::stringstream ss;
- ss << "dir=\"" << dirname << "\" errno=" << errno;
throw jexception(jerrno::JERR_JDIR_CLOSEDIR, ss.str(), "jdir", "create_bak_dir");
}
std::stringstream dn;
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -90,6 +90,9 @@
// class rmgr
const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC = 0x0900;
+const u_int32_t jerrno::JERR_RMGR_RIDMISMATCH = 0x0901;
+const u_int32_t jerrno::JERR_RMGR_FIDMISMATCH = 0x0902;
+const u_int32_t jerrno::JERR_RMGR_ENQSTATE = 0x0903;
// class data_tok
const u_int32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
@@ -160,6 +163,12 @@
// class rmgr
_err_map[JERR_RMGR_UNKNOWNMAGIC] = std::string("JERR_RMGR_UNKNOWNMAGIC: "
"Found record with unknown magic.");
+ _err_map[JERR_RMGR_RIDMISMATCH] = std::string("JERR_RMGR_RIDMISMATCH: "
+ "RID mismatch between current record and dtok RID");
+ _err_map[JERR_RMGR_FIDMISMATCH] = std::string("JERR_RMGR_FIDMISMATCH: "
+ "FID mismatch between emap and rrfc");
+ _err_map[JERR_RMGR_ENQSTATE] = std::string("JERR_RMGR_ENQSTATE: "
+ "Attempted read when data token wstate was not ENQ");
// class data_tok
_err_map[JERR_DTOK_ILLEGALSTATE] = std::string("JERR_MTOK_ILLEGALSTATE: "
@@ -168,8 +177,9 @@
// class enq_map
_err_map[JERR_EMAP_DUPLICATE] = std::string("JERR_EMAP_DUPLICATE: "
- "Attempted to insert using duplicat key.");
- _err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: Key not found in map.");
+ "Attempted to insert enqueue record using duplicate key.");
+ _err_map[JERR_EMAP_NOTFOUND] = std::string("JERR_EMAP_NOTFOUND: "
+ "Key not found in enqueue map.");
//_err_map[] = std::string("");
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -107,6 +107,9 @@
// class rmgr
static const u_int32_t JERR_RMGR_UNKNOWNMAGIC; ///< Found record with unknown magic
+ static const u_int32_t JERR_RMGR_RIDMISMATCH; ///< RID mismatch between rec and dtok
+ static const u_int32_t JERR_RMGR_FIDMISMATCH; ///< FID mismatch between emap and rrfc
+ static const u_int32_t JERR_RMGR_ENQSTATE; ///< Attempted read when wstate not ENQ
// class data_tok
static const u_int32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
Modified: store/trunk/cpp/lib/jrnl/nlfh.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/nlfh.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -33,6 +33,7 @@
#include <jrnl/nlfh.hpp>
+#include <assert.h>
#include <cerrno>
#include <fcntl.h>
#include <iomanip>
@@ -315,32 +316,40 @@
return _wr_cmpl_cnt_dblks;
}
-const u_int32_t
-nlfh::rd_remaining_dblks() const throw (jexception)
-{
- if (_rd_subm_cnt_dblks > _wr_subm_cnt_dblks)
- {
- std::stringstream ss;
- ss << "_rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks;
- ss << " _wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
- throw jexception(jerrno::JERR_NFLH_RDOFFSOVFL, ss.str(), "nlfh", "rd_remaining_dblks");
- }
- return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks;
-}
+// const bool
+// nlfh::is_rd_compl() const throw (jexception)
+// {
+// return rd_remaining_dblks() == 0 &&
+// (JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) - _wr_cmpl_cnt_dblks == 0;
+// }
+//
+// const u_int32_t
+// nlfh::rd_remaining_dblks() const throw (jexception)
+// {
+// if (_rd_subm_cnt_dblks > _wr_subm_cnt_dblks)
+// {
+// std::stringstream ss;
+// ss << "_rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks;
+// ss << " _wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
+// throw jexception(jerrno::JERR_NFLH_RDOFFSOVFL, ss.str(), "nlfh", "rd_remaining_dblks");
+// }
+// return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks;
+// }
+//
+// const u_int32_t
+// nlfh::wr_remaining_dblks() const throw (jexception)
+// {
+// assert(_wr_subm_cnt_dblks <= JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1));
+// if (_wr_subm_cnt_dblks > JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) // Allow for file header
+// {
+// std::stringstream ss;
+// ss << "_wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
+// ss << " fsize=" << JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1) << " dblks";
+// throw jexception(jerrno::JERR_NLFH_FILEOFFSOVFL, ss.str(), "nlfh", "wr_remaining_dblks");
+// }
+// return (JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) - _wr_subm_cnt_dblks;
+// }
-const u_int32_t
-nlfh::wr_remaining_dblks() const throw (jexception)
-{
- if (_wr_subm_cnt_dblks > JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) // Allow for file header
- {
- std::stringstream ss;
- ss << "_wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
- ss << " fsize=" << JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1) << " dblks";
- throw jexception(jerrno::JERR_NLFH_FILEOFFSOVFL, ss.str(), "nlfh", "wr_remaining_dblks");
- }
- return (JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1)) - _wr_subm_cnt_dblks;
-}
-
const bool
nlfh::will_fit(const size_t rec_size) const
{
@@ -384,5 +393,9 @@
}
}
+// static const definition
+
+const u_int32_t nlfh::_fsize_dblks = JRNL_SBLK_SIZE * (JRNL_FILE_SIZE + 1);
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/nlfh.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/nlfh.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -57,15 +57,16 @@
class nlfh
{
protected:
+ static const u_int32_t _fsize_dblks; ///< File size in dblks
std::string _fname; ///< File name
u_int16_t _fid; ///< File ID (ordinal number in ring buffer)
int _rd_fh; ///< Read file handle
int _wr_fh; ///< Write file handle
u_int32_t _rec_enqcnt; ///< Count of enqueued records
- u_int32_t _rd_subm_cnt_dblks; ///< Read file count (data blocks) for submitted AIO
- u_int32_t _rd_cmpl_cnt_dblks; ///< Read file count (data blocks) for completed AIO
- u_int32_t _wr_subm_cnt_dblks; ///< Write file count (data blocks) for submitted AIO
- u_int32_t _wr_cmpl_cnt_dblks; ///< Write file count (data blocks) for completed AIO
+ u_int32_t _rd_subm_cnt_dblks; ///< Read file count (data blocks) for submitted AIO
+ u_int32_t _rd_cmpl_cnt_dblks; ///< Read file count (data blocks) for completed AIO
+ u_int32_t _wr_subm_cnt_dblks; ///< Write file count (data blocks) for submitted AIO
+ u_int32_t _wr_cmpl_cnt_dblks; ///< Write file count (data blocks) for completed AIO
public:
nlfh();
@@ -109,13 +110,31 @@
// Derived helper functions
- const u_int32_t rd_remaining_dblks() const throw (jexception);
- inline const bool is_rd_compl() const throw (jexception)
- { return rd_remaining_dblks() == 0; }
- const u_int32_t wr_remaining_dblks() const throw (jexception);
- inline const bool is_wr_compl() const throw (jexception)
- { return wr_remaining_dblks() == 0; }
- inline const bool empty() const { return _wr_subm_cnt_dblks == 0; }
+ inline const bool rd_empty() const { return _wr_cmpl_cnt_dblks == 0; }
+ inline const u_int32_t rd_remaining_dblks() const
+ { return _wr_cmpl_cnt_dblks - _rd_subm_cnt_dblks; }
+ inline const bool is_rd_full() const { return _wr_cmpl_cnt_dblks == _rd_subm_cnt_dblks; }
+ inline const bool is_rd_compl() const
+ { return _wr_cmpl_cnt_dblks == _rd_cmpl_cnt_dblks; }
+ inline const u_int32_t rd_aio_outstanding_dblks() const
+ { return _rd_subm_cnt_dblks - _rd_cmpl_cnt_dblks; }
+ inline const bool rd_file_rotate() const { return is_rd_full() && is_wr_compl(); }
+
+ inline const bool wr_empty() const { return _wr_subm_cnt_dblks == 0; }
+ inline const u_int32_t wr_remaining_dblks() const
+ { return _fsize_dblks - _wr_subm_cnt_dblks; }
+ inline const bool is_wr_full() const { return _fsize_dblks == _wr_subm_cnt_dblks; }
+ inline const bool is_wr_compl() const { return _fsize_dblks == _wr_cmpl_cnt_dblks; }
+ inline const u_int32_t wr_aio_outstanding_dblks() const
+ { return _wr_subm_cnt_dblks - _wr_cmpl_cnt_dblks; }
+ inline const bool wr_file_rotate() const { return is_wr_full(); }
+
+// const u_int32_t rd_remaining_dblks() const throw (jexception);
+// const bool is_rd_compl() const throw (jexception);
+// const u_int32_t wr_remaining_dblks() const throw (jexception);
+// inline const bool is_wr_compl() const throw (jexception)
+// { return wr_remaining_dblks() == 0; }
+
const bool will_fit(const size_t rec_size) const;
protected:
Modified: store/trunk/cpp/lib/jrnl/pmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/pmgr.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -76,7 +76,7 @@
pmgr::pmgr(jcntl* jc, enq_map& emap, const u_int32_t pagesize, const u_int16_t pages):
_pagesize(pagesize),
_pages(pages),
- _jc(jc),
+ _jc(jc),
_emap(emap),
_dtokl(NULL),
_page_base_ptr(NULL),
@@ -96,7 +96,7 @@
std::deque<data_tok*>* const dtokl) throw (jexception):
_pagesize(pagesize),
_pages(pages),
- _jc(jc),
+ _jc(jc),
_emap(emap),
_dtokl(dtokl),
_page_base_ptr(NULL),
@@ -131,6 +131,7 @@
_pg_offset_dblks = 0;
_aio_evt_rem = 0;
clean();
+ _emap.clear();
// 1. Allocate page memory (as a single block)
size_t pagesize = _pages * _pagesize * _sblksize;
@@ -244,5 +245,23 @@
}
}
+const char*
+pmgr::page_state_str(page_state ps)
+{
+ switch (ps)
+ {
+ case UNUSED:
+ return "UNUSED";
+ case IN_USE:
+ return "IN_USE";
+ case AIO_PENDING:
+ return "AIO_PENDING";
+ case AIO_COMPLETE:
+ return "AIO_COMPLETE";
+ default:
+ return "<unknown state>";
+ }
+}
+
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -110,7 +110,7 @@
static const u_int32_t _sblksize; ///< Disk softblock size
const u_int32_t _pagesize; ///< Size of page cache pages
const u_int16_t _pages; ///< Number of page cache pages
- jcntl* _jc; ///< Pointer to journal controller
+ jcntl* _jc; ///< Pointer to journal controller
enq_map& _emap; ///< Ref to enqueue map
std::deque<data_tok*>* _dtokl; ///< Pointer to external data token list
void* _page_base_ptr; ///< Base pointer to page memory
@@ -136,6 +136,7 @@
virtual const u_int32_t get_events(page_state state) throw (jexception) = 0;
inline const u_int32_t get_aio_evt_rem() const { return _aio_evt_rem; }
+ static const char* page_state_str(page_state ps);
protected:
virtual void initialize() throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -66,51 +66,142 @@
}
const iores
-rmgr::read(void* const mbuf, const size_t mbsize, data_tok* dtok) throw (jexception)
+rmgr::read(void* const mbuf, const size_t mbsize, data_tok* dtokp) throw (jexception)
{
+//std::cout << " rmgr::read()" << std::flush;
if (_aio_evt_rem)
get_events();
- if(dblks_rem() == 0 && _rrfc.empty())
+//std::cout << " [a pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << "]" << std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_full())
+ {
+ aio_cycle(); // check if any AIOs have returned
return RHM_IORES_EMPTY;
+ }
+//std::cout << " b" << std::flush;
+ // Check write state of this token is ENQ - required for read
+ if (!dtokp->is_readable())
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "dtok_id=0x" << std::setw(8) << dtokp->id();
+ ss << "; dtok_rid=0x" << std::setw(16) << dtokp->rid();
+ ss << "; dtok_wstate=" << dtokp->wstate_str();
+ throw jexception(jerrno::JERR_RMGR_ENQSTATE, ss.str(), "rmgr", "read");
+ }
- if (dtok->rstate() == data_tok::READ_PART)
+//std::cout << " c" << std::flush;
+ if (dtokp->rstate() == data_tok::SKIP_PART)
{
- assert(dtok->rid() == _hdr._rid);
+ const iores res = skip(dtokp);
+ if (res != RHM_IORES_SUCCESS)
+ return res;
+ }
+//std::cout << " d" << std::flush;
+ if (dtokp->rstate() == data_tok::READ_PART)
+ {
+ assert(dtokp->rid() == _hdr._rid);
void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] + (_pg_offset_dblks * JRNL_DBLK_SIZE));
- return read_enq(_hdr, rptr, dtok);
+ return read_enq(_hdr, rptr, dtokp);
}
- else
+//std::cout << " e" << std::flush;
+ _hdr.reset();
+ // Read header, determine next record type
+ while (true)
{
- _hdr.reset();
- // Read header, determine next record type
- while (true)
+//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << "]" << std::flush;
+ if(dblks_rem() == 0 && _rrfc.is_full())
{
- if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ aio_cycle(); // check if any AIOs have returned
+ return RHM_IORES_EMPTY;
+ }
+//std::cout << " g" << std::flush;
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ aio_cycle();
+ return RHM_IORES_AIO_WAIT;
+ }
+//std::cout << " h" << std::flush;
+ void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
+ (_pg_offset_dblks * JRNL_DBLK_SIZE));
+ ::memcpy(&_hdr, rptr, sizeof(hdr));
+ switch (_hdr._magic)
+ {
+ case RHM_JDAT_ENQ_MAGIC:
{
- aio_cycle();
- return RHM_IORES_AIO_WAIT;
+//std::cout << " E" << std::flush;
+ _data_rec.reset(mbuf, mbsize); // sets enqueue rec size
+ // Check if RID of this rec is still enqueued, if so read it, else skip
+#ifdef RHM_RDONLY
+ bool is_enq = true;
+#else
+ u_int16_t fid = 0;
+ bool is_enq = false;
+ try
+ {
+ fid = _emap.get_fid(_hdr._rid);
+ is_enq = true;
+ }
+ catch (jexception& e)
+ {
+ if (e.err_code() != jerrno::JERR_EMAP_NOTFOUND)
+ throw e;
+ }
+#endif
+ if (is_enq) // ok, this record is enqueued, check it, then read it...
+ {
+//std::cout << " $" << std::flush;
+ if (dtokp->rid())
+ {
+ if (_hdr._rid != dtokp->rid())
+ {
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "rid=0x" << std::setw(16) << _hdr._rid;
+ ss << "; dtok_rid=" << std::setw(16) << dtokp->rid();
+ ss << "; dtok_id=0x" << std::setw(8) << dtokp->id();
+ throw jexception(jerrno::JERR_RMGR_RIDMISMATCH, ss.str(), "rmgr",
+ "read");
+ }
+ }
+ else
+ dtokp->set_rid(_hdr._rid);
+#ifndef RHM_RDONLY
+// TODO: Add member _fid to pmgr::page_cb which indicates the fid from which this page was
+// populated. When this value is set in wmgr::flush() somewehere, then uncomment the following
+// check:
+// if (fid != _page_cb_arr[_pg_index]._fid)
+// {
+// std::stringstream ss;
+// ss << std::hex << std::setfill('0');
+// ss << "rid=0x" << std::setw(16) << _hdr._rid;
+// ss << "; emap_fid=0x" << std::setw(4) << fid;
+// ss << "; current_fid=" << _rrfc.fid();
+// throw jexception(jerrno::JERR_RMGR_FIDMISMATCH, ss.str(), "rmgr", "read");
+// }
+#endif
+ return read_enq(_hdr, rptr, dtokp);
+ }
+ else
+ {
+//std::cout << " %" << std::flush;
+ // skip this record, it is already dequeued
+ skip(dtokp);
+ }
+ break;
}
- void* rptr = (void*)((char*)_page_ptr_arr[_pg_index] +
- (_pg_offset_dblks * JRNL_DBLK_SIZE));
- ::memcpy(&_hdr, rptr, sizeof(hdr));
- dtok->set_rid(_hdr._rid);
- switch (_hdr._magic)
- {
- case RHM_JDAT_ENQ_MAGIC:
- _data_rec.reset(mbuf, mbsize);
- return read_enq(_hdr, rptr, dtok);
- case RHM_JDAT_DEQ_MAGIC:
- consume_deq();
- break;
- case RHM_JDAT_EMPTY_MAGIC:
- consume_filler();
- break;
- default:
- std::stringstream ss;
- ss << std::setw(8) << std::setfill('0');
- ss << std::hex << "Magic=0x" << _hdr._magic << std::dec;
- throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, ss.str(), "rmgr", "read");
- }
+ case RHM_JDAT_DEQ_MAGIC:
+//std::cout << " D" << std::flush;
+ consume_deq();
+ break;
+ case RHM_JDAT_EMPTY_MAGIC:
+//std::cout << " X" << std::flush;
+ consume_filler();
+ break;
+ default:
+ std::stringstream ss;
+ ss << std::hex << std::setfill('0');
+ ss << "Magic=0x" << std::setw(8) << _hdr._magic << std::dec;
+ throw jexception(jerrno::JERR_RMGR_UNKNOWNMAGIC, ss.str(), "rmgr", "read");
}
}
}
@@ -161,6 +252,7 @@
// Use stored pointer to nlfh in the pcb instead.
pcbp->_rdblks = iocbp->u.c.nbytes / JRNL_DBLK_SIZE;
pcbp->_rfh->add_rd_cmpl_cnt_dblks(pcbp->_rdblks);
+//std::cout << "{r*" << pcbp->_index << " fid=" << pcbp->_rfh->fid() << "}" << std::flush;
// Clean up this pcb's data_tok list
pcbp->_pdtokl->clear();
@@ -181,45 +273,50 @@
}
const iores
-rmgr::read_enq(hdr& h, void* rptr, data_tok* dtok)
+rmgr::read_enq(hdr& h, void* rptr, data_tok* dtokp)
throw (jexception)
{
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
+//std::cout << " U=" << page_state_str(_page_cb_arr[_pg_index]._state) << std::flush;
aio_cycle(); // check if any AIOs have returned
return RHM_IORES_AIO_WAIT;
}
+//std::cout << " V" << std::flush;
// Read data from this page, first block will have header and data size.
- u_int32_t dblks_rd = _data_rec.decode(h, rptr, dtok->dblocks_proc(), dblks_rem());
- dtok->incr_dblocks_proc(dblks_rd);
+ u_int32_t dblks_rd = _data_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
_pg_offset_dblks += dblks_rd;
// If data still incomplete, move to next page and decode again
- while (dtok->dblocks_proc() < _data_rec.rec_size_dblks())
+ while (dtokp->dblocks_read() < _data_rec.rec_size_dblks())
{
+//std::cout << " W" << std::flush;
rotate_page();
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
- dtok->set_rstate(data_tok::READ_PART);
- dtok->set_dsize(_data_rec.data_size());
+ dtokp->set_rstate(data_tok::READ_PART);
+ dtokp->set_dsize(_data_rec.data_size());
return RHM_IORES_AIO_WAIT;
}
rptr = (void*)((char*)_page_ptr_arr[_pg_index]);
- dblks_rd = _data_rec.decode(h, rptr, dtok->dblocks_proc(), dblks_rem());
- dtok->incr_dblocks_proc(dblks_rd);
+ dblks_rd = _data_rec.decode(h, rptr, dtokp->dblocks_read(), dblks_rem());
+ dtokp->incr_dblocks_read(dblks_rd);
_pg_offset_dblks += dblks_rd;
}
// If we have finished with this page, rotate it
if (dblks_rem() == 0)
+//{std::cout << " X" << std::flush;
rotate_page();
+//}
- // Set the record size in dtok
- dtok->set_rstate(data_tok::READ);
- dtok->set_dsize(_data_rec.data_size());
+ // Set the record size in dtokp
+ dtokp->set_rstate(data_tok::READ);
+ dtokp->set_dsize(_data_rec.data_size());
return RHM_IORES_SUCCESS;
}
@@ -241,6 +338,48 @@
rotate_page();
}
+const iores
+rmgr::skip(data_tok* dtokp) throw (jexception)
+{
+ u_int32_t tot_dblk_cnt = dtokp->rstate() == data_tok::SKIP_PART ? dtokp->dblocks_read() : 0;
+//std::cout << " S" << tot_dblk_cnt << std::flush;
+ while (true)
+ {
+ u_int32_t this_dblk_cnt = 0;
+ if (_data_rec.rec_size_dblks() - this_dblk_cnt > dblks_rem())
+//{std::cout << "-1" << std::flush;
+ this_dblk_cnt = dblks_rem();
+//}
+ else
+//{std::cout << "-2" << std::flush;
+ this_dblk_cnt = _data_rec.rec_size_dblks() - this_dblk_cnt;
+//}
+//std::cout << "->" << this_dblk_cnt << std::flush;
+ dtokp->incr_dblocks_read(this_dblk_cnt);
+ _pg_offset_dblks += this_dblk_cnt;
+ tot_dblk_cnt += this_dblk_cnt;
+ if (tot_dblk_cnt < _data_rec.rec_size_dblks())
+ {
+//std::cout << " * t=" << tot_dblk_cnt << " rs=" << _data_rec.rec_size_dblks() << std::flush;
+ rotate_page();
+ if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
+ {
+ dtokp->set_rstate(data_tok::SKIP_PART);
+ // Use data_tok::dblocks_proc field to save how many skip bloks still to go...
+ dtokp->set_dblocks_read(_data_rec.rec_size_dblks() - tot_dblk_cnt);
+ return RHM_IORES_AIO_WAIT;
+ }
+//std::cout << " !" << std::flush;
+ }
+ else
+ {
+ // Skip complete, put state back to unread
+ dtokp->set_rstate(data_tok::UNREAD);
+ return RHM_IORES_SUCCESS;
+ }
+ }
+}
+
void
rmgr::aio_cycle() throw (jexception)
{
@@ -276,11 +415,15 @@
void
rmgr::init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit) throw (jexception)
{
+//std::cout << " #(" << first_uninit << "," << num_uninit << ")" << std::flush;
for (int16_t i=0; i<num_uninit; i++)
{
+//std::cout << ">pi=" << ((i + first_uninit) % _pages) << "-wc=" << (_rrfc.file_handle()->wr_cmpl_cnt_dblks()) << "-fid=" << (_rrfc.file_handle()->fid()) << std::flush;
if (_rrfc.empty()) // Nothing to do; this file not yet written to
- break;
-
+//{std::cout << "!" << std::flush;
+ break;
+//}
+
// If this is the first read from a file, increase the read pointers to beyond fhdr
// or consume fhdr here for analysis (not req'd at present)
if (_rrfc.subm_offs() == 0)
@@ -297,6 +440,7 @@
u_int32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks;
if (rd_size)
{
+//std::cout << " <frd=" << file_rem_dblks << ">" << std::flush;
int16_t pi = (i + first_uninit) % _pages;
// TODO: For perf, combine contiguous pages into single read
// 1 or 2 AIOs needed depending on whether read block folds
@@ -309,9 +453,17 @@
_aio_evt_rem++;
_page_cb_arr[pi]._state = AIO_PENDING;
_page_cb_arr[pi]._rfh = _rrfc.file_handle();
+//std::cout << "{r^" << pi << "}" << std::flush;
}
- if (_rrfc.is_compl())
+ else // If there is nothing to read for this page, neither will there be for the others...
+//{std::cout << "&" << std::flush;
+ break;
+//}
+ if (_rrfc.file_rotate())
+ {
+//std::cout << " @@@@@" << std::flush;
_rrfc.rotate();
+ }
}
}
@@ -329,10 +481,13 @@
void
rmgr::rotate_page()
{
+ _page_cb_arr[_pg_index]._rdblks = 0;
pmgr::rotate_page();
aio_cycle();
_pg_offset_dblks = 0;
// This counter is for bookkeeping only, page rotates are handled directly in init_aio_reads()
+ // FIXME: _pg_cntr should be sync'd with aio ops, not use of page as it is now...
+ // Need to move reset into if (_rrfc.file_rotate()) above.
if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_RMGR_PAGE_SIZE))
_pg_cntr = 0;
}
Modified: store/trunk/cpp/lib/jrnl/rmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/rmgr.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -63,7 +63,8 @@
public:
rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc);
- rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>* const dtokl) throw (jexception);
+ rmgr(jcntl* jc, enq_map& emap, rrfc& rrfc, std::deque<data_tok*>* const dtokl)
+ throw (jexception);
~rmgr();
void initialize(std::deque<data_tok*>* const dtokl, const aio_cb rd_cb) throw (jexception);
@@ -76,6 +77,7 @@
const iores read_enq(hdr& h, void* rptr, data_tok* dtok) throw (jexception);
void consume_deq() throw (jexception);
void consume_filler() throw (jexception);
+ const iores skip(data_tok* dtokp) throw (jexception);
void aio_cycle() throw (jexception);
void init_aio_reads(const int16_t first_uninit, const u_int16_t num_uninit)
throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -117,10 +117,14 @@
inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
{ return _curr_fh->add_rd_cmpl_cnt_dblks(a); }
- inline const u_int32_t remaining_dblks() const throw (jexception)
- { return _curr_fh->rd_remaining_dblks(); }
- inline const bool is_compl() const throw (jexception) { return _curr_fh->is_rd_compl(); };
- inline const bool empty() const { return _curr_fh->empty(); }
+ inline const bool empty() const { return _curr_fh->rd_empty(); }
+ inline const u_int32_t remaining_dblks() { return _curr_fh->rd_remaining_dblks(); }
+ inline const bool is_full() const { return _curr_fh->is_rd_full(); }
+ inline const bool is_compl() const { return _curr_fh->is_rd_compl(); }
+ inline const u_int32_t aio_outstanding_dblks()
+ { return _curr_fh->rd_aio_outstanding_dblks(); }
+ inline const bool file_rotate() const { return _curr_fh->rd_file_rotate(); }
+
inline const bool will_fit(const size_t size) const { return _curr_fh->will_fit(size); }
}; // class rrfc
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -108,7 +108,7 @@
u_int64_t rid;
if (dtok->getSourceMessage())
{
- rid = dtok->rid();
+ rid = dtok->rid();
assert(rid != 0);
}
else
@@ -122,7 +122,7 @@
{
assert(_pg_offset_dblks < JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
- u_int32_t data_offs_dblks = dtok->dblocks_proc();
+ u_int32_t data_offs_dblks = dtok->dblocks_written();
u_int32_t ret = _data_rec.encode(wptr, data_offs_dblks,
(JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
@@ -134,10 +134,10 @@
#endif
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
- dtok->incr_dblocks_proc(ret);
+ dtok->incr_dblocks_written(ret);
// Is the encoding of this record complete?
- if (dtok->dblocks_proc() >= _data_rec.rec_size_dblks())
+ if (dtok->dblocks_written() >= _data_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtok->set_wstate(data_tok::ENQ_SUBM);
@@ -180,6 +180,7 @@
done = true;
}
+ // File full?
if (_pg_cntr >= (JRNL_FILE_SIZE / JRNL_WMGR_PAGE_SIZE))
{
iores rfres = rotate_file();
@@ -217,11 +218,11 @@
u_int64_t rid;
if (dtok->getSourceMessage())
{
- rid = dtok->dequeue_rid();
+ rid = dtok->dequeue_rid();
assert(rid != 0);
}
else
- rid = _wrfc.get_incr_rid();
+ rid = _wrfc.get_incr_rid();
//***
@@ -304,6 +305,7 @@
_wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
_aio_evt_rem++;
_cached_offset_dblks = 0;
+//std::cout << "{w^" << _pg_index << "}" << std::flush;
// Rotate cache page?
if (_pg_offset_dblks >= JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE)
@@ -363,8 +365,9 @@
ss << " offset=" << iocbp->u.c.offset << " fh=" << iocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, ss.str(), "wmgr", "get_events");
}
- if (pcbp) // File header writes have no pcb
+ if (pcbp) // File header writes have no pcb, and will jump to else
{
+//std::cout << "{w*" << pcbp->_index << "}" << std::flush;
u_int32_t s = pcbp->_pdtokl->size();
for (u_int32_t k=0; k<s; k++)
{
Modified: store/trunk/cpp/lib/jrnl/wrfc.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/wrfc.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -74,6 +74,7 @@
if (_fh_index == _nfiles)
_fh_index = 0;
_curr_fh = _fh_arr[_fh_index];
+//std::cout << "wrfc::rotate() RESET fid=" << _fh_index << std::flush;
return reset(); //Checks if file is still in use (ie not fully dequeued yet)
}
Modified: store/trunk/cpp/lib/jrnl/wrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/lib/jrnl/wrfc.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -101,9 +101,13 @@
inline const u_int32_t add_cmpl_cnt_dblks(u_int32_t a) throw (jexception)
{ return _curr_fh->add_wr_cmpl_cnt_dblks(a); }
- inline const u_int32_t remaining_dblks() const throw (jexception)
- { return _curr_fh->wr_remaining_dblks(); }
- inline const bool is_compl() const throw (jexception) { return _curr_fh->is_wr_compl(); };
+ inline const bool empty() const { return _curr_fh->wr_empty(); }
+ inline const u_int32_t remaining_dblks() const { return _curr_fh->wr_remaining_dblks(); }
+ inline const bool is_full() const { return _curr_fh->is_wr_full(); };
+ inline const bool is_compl() const { return _curr_fh->is_wr_compl(); };
+ inline const u_int32_t aio_outstanding_dblks() const
+ { return _curr_fh->wr_aio_outstanding_dblks(); }
+ inline const bool file_rotate() const { return _curr_fh->wr_file_rotate(); }
}; // class wrfc
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/JournalTest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/JournalTest.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -64,6 +64,8 @@
CPPUNIT_TEST(Test_024);
CPPUNIT_TEST(Test_025);
CPPUNIT_TEST(Test_026);
+ CPPUNIT_TEST(Test_027);
+ CPPUNIT_TEST(Test_028);
CPPUNIT_TEST_SUITE_END();
jtest t;
@@ -100,136 +102,148 @@
void Test_001()
{
- runJournalTest(1, 10, 10, false, 5, "1*(10 bytes)");
+ runJournalTest(1, 10, 10, false, 2, "1*(10 bytes)");
}
void Test_002()
{
- runJournalTest(1, 10, 10, true, 5, "1*(10 bytes), auto-deq");
+ runJournalTest(1, 10, 10, true, 2, "1*(10 bytes), auto-deq");
}
void Test_003()
{
- runJournalTest(10, 10, 10, false, 5, "10*(10 bytes)");
+ runJournalTest(10, 10, 10, false, 2, "10*(10 bytes)");
}
void Test_004()
{
- runJournalTest(10, 10, 10, true, 5, "10*(10 bytes), auto-deq");
+ runJournalTest(10, 10, 10, true, 2, "10*(10 bytes), auto-deq");
}
void Test_005()
{
- runJournalTest(10, 92, 92, false, 5, "10*(1 dblk exact fit)");
+ runJournalTest(10, 92, 92, false, 2, "10*(1 dblk exact fit)");
}
void Test_006()
{
- runJournalTest(10, 92, 92, true, 5, "10*(1 dblk exact fit), auto-deq");
+ runJournalTest(10, 92, 92, true, 2, "10*(1 dblk exact fit), auto-deq");
}
void Test_007()
{
- runJournalTest(10, 93, 93, false, 5, "10*(1 dblk + 1 byte)");
+ runJournalTest(10, 93, 93, false, 2, "10*(1 dblk + 1 byte)");
}
void Test_008()
{
- runJournalTest(10, 93, 93, true, 5, "10*(1 dblk + 1 byte), auto-deq");
+ runJournalTest(10, 93, 93, true, 2, "10*(1 dblk + 1 byte), auto-deq");
}
void Test_009()
{
- runJournalTest(10, 476, 476, false, 5, "10*(1 sblk exact fit)");
+ runJournalTest(10, 476, 476, false, 2, "10*(1 sblk exact fit)");
}
void Test_010()
{
- runJournalTest(10, 476, 476, true, 5, "10*(1 sblk exact fit), auto-deq");
+ runJournalTest(10, 476, 476, true, 2, "10*(1 sblk exact fit), auto-deq");
}
void Test_011()
{
- runJournalTest(10, 477, 477, false, 5, "10*(1 sblk + 1 byte)");
+ runJournalTest(10, 477, 477, false, 2, "10*(1 sblk + 1 byte)");
}
void Test_012()
{
- runJournalTest(10, 477, 477, true, 5, "10*(1 sblk + 1 byte), auto-deq");
+ runJournalTest(10, 477, 477, true, 2, "10*(1 sblk + 1 byte), auto-deq");
}
void Test_013()
{
- runJournalTest(8, 4060, 4060, false, 5, "8*(1/8 page)");
+ runJournalTest(8, 4060, 4060, false, 2, "8*(1/8 page)");
}
void Test_014()
{
- runJournalTest(9, 4060, 4060, false, 5, "9*(1/8 page)");
+ runJournalTest(9, 4060, 4060, false, 2, "9*(1/8 page)");
}
void Test_015()
{
- runJournalTest(8, 4061, 4061, false, 5, "8*(1/8 page + 1 byte)");
+ runJournalTest(8, 4061, 4061, false, 2, "8*(1/8 page + 1 byte)");
}
void Test_016()
{
- runJournalTest(8, 3932, 3932, true, 5, "8*(1/8 page - 1 dblk for deq record), auto-deq");
+ runJournalTest(8, 3932, 3932, true, 2, "8*(1/8 page - 1 dblk for deq record), auto-deq");
}
void Test_017()
{
- runJournalTest(9, 3932, 3932, true, 5, "9*(1/8 page - 1 dblk for deq record), auto-deq");
+ runJournalTest(9, 3932, 3932, true, 2, "9*(1/8 page - 1 dblk for deq record), auto-deq");
}
void Test_018()
{
- runJournalTest(8, 3933, 3933, true, 5,
+ runJournalTest(8, 3933, 3933, true, 2,
"8*(1/8 page - 1 dblk for deq record + 1 byte), auto-deq");
}
void Test_019()
{
- runJournalTest(32, 32732, 32732, false, 5, "32*(1 page exact fit)");
+ runJournalTest(32, 32732, 32732, false, 2, "32*(1 page exact fit)");
}
void Test_020()
{
- runJournalTest(33, 32732, 32732, false, 10, "33*(1 page exact fit)");
+ runJournalTest(33, 32732, 32732, false, 2, "33*(1 page exact fit)");
}
void Test_021()
{
- runJournalTest(22, 49116, 49116, false, 10, "22*(1.5 pages)");
+ runJournalTest(22, 49116, 49116, false, 2, "22*(1.5 pages)");
}
void Test_022()
{
- runJournalTest(22, 48988, 48988, true, 10,
+ runJournalTest(22, 48988, 48988, true, 2,
"22*(1.5 pages - 1 dblk for deq record), auto-deq");
}
void Test_023()
{
- runJournalTest(48, 32732, 32732, false, 10, "48*(1 page exact fit)");
+ runJournalTest(48, 32732, 32732, false, 2, "48*(1 page exact fit)");
}
void Test_024()
{
- runJournalTest(49, 32732, 32732, false, 10, "49*(1 page exact fit)");
+ runJournalTest(49, 32732, 32732, false, 2, "49*(1 page exact fit)");
}
void Test_025()
{
- runJournalTest(20, 81884, 81884, false, 10, "20*(2.5 pages)");
+ runJournalTest(20, 81884, 81884, false, 2, "20*(2.5 pages)");
}
void Test_026()
{
- runJournalTest(20, 81756, 81756, true, 10,
+ runJournalTest(20, 81756, 81756, true, 2,
"20*(2.5 pages - 1 dblk for deq record), auto-deq");
}
+
+ void Test_027()
+ {
+ runJournalTest(16, 786268, 786268, true, 2,
+ "16*(24 pages = 1/2 file); Total = 8 files exactly (full journal filespace)");
+ }
+
+ void Test_028()
+ {
+ runJournalTest(17, 786268, 786268, true, 2,
+ "17*(24 pages = 1/2 file); Total = 8 files + file 0 overwritten by 1/2 file");
+ }
};
// Make this test suite a plugin.
Modified: store/trunk/cpp/tests/jrnl/Makefile.am
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/Makefile.am 2007-09-14 17:10:49 UTC (rev 921)
@@ -21,7 +21,8 @@
abs_builddir=@abs_builddir@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread -DRHM_CLEAN -DRHM_WRONLY -DRHM_TESTVALS
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread
+#AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(QPID_CXXFLAGS) -pthread -DRHM_CLEAN -DRHM_WRONLY -DRHM_TESTVALS
INCLUDES=-I../../lib
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-14 17:10:49 UTC (rev 921)
@@ -122,6 +122,5 @@
clean-data:
@rm -rf jdata
- @rm -rf rd_test_jrnls
clean-all: clean clean-data
Modified: store/trunk/cpp/tests/jrnl/jtest.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/jtest.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -123,6 +123,7 @@
p_args = new msg_producer::_p_args(_jc, ta._num_msgs, ta._min_msg_size, ta._max_msg_size,
ta._auto_deq);
c_args = new msg_consumer::_c_args(_jc, ta._num_msgs, ta._min_msg_size, ta._max_msg_size);
+
#ifndef RHM_RDONLY
_mp.initialize(p_args);
#endif
@@ -130,6 +131,16 @@
_mc.initialize(c_args);
#endif
_jc.initialize(&_mc.aio_dtokl(), NULL, &_mp.aio_dtokl(), &mp_aio_cb);
+
+ assert(_dtok_master_list.size() == 0);
+ for (u_int32_t i=0; i<p_args->_num_msgs; i++)
+ {
+ rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+#ifdef RHM_RDONLY
+ dtp->set_wstate(rhm::journal::data_tok::ENQ); // Assume all msgs enqueued OK
+#endif
+ _dtok_master_list.push_back(dtp);
+ }
// gettimeofday(_end_time, _tz_ptr);
// string str;
@@ -147,22 +158,30 @@
// Serial execution - uncomment these lines...
#ifndef RHM_RDONLY
- produce(p_args);
+// produce(p_args);
#ifndef RHM_WRONLY
// std::cout << "-----" << std::endl;
- usleep(500000); // 0.5 sec
+// usleep(500000); // 0.5 sec
#endif
#endif
#ifndef RHM_WRONLY
- consume(c_args);
+// consume(c_args);
#endif
// Concurrent execution - uncomment these lines...
-// pthread_create(&_p_thread, NULL, produce, (void*)&p_arg_arr[_tnum]);
-// pthread_create(&_c_thread, NULL, consume, (void*)&c_args);
-// pthread_join(_p_thread, NULL);
-// pthread_join(_c_thread, NULL);
+#ifndef RHM_RDONLY
+ pthread_create(&_p_thread, NULL, produce, (void*)p_args);
+#endif
+#ifndef RHM_WRONLY
+ pthread_create(&_c_thread, NULL, consume, (void*)c_args);
+#endif
+#ifndef RHM_RDONLY
+ pthread_join(_p_thread, NULL);
+#endif
+#ifndef RHM_WRONLY
+ pthread_join(_c_thread, NULL);
+#endif
if (p_args->_err)
if (p_args->_err != 1) // This is the zero msg length err, ignore
@@ -196,6 +215,15 @@
delete c_args;
c_args = NULL;
}
+ for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
+ {
+ if (_dtok_master_list[i])
+ {
+ delete _dtok_master_list[i];
+ _dtok_master_list[i] = NULL;
+ }
+ }
+ _dtok_master_list.clear();
}
string&
@@ -211,10 +239,11 @@
}
// static varialbe declarations
+std::deque<rhm::journal::data_tok*> jtest::_dtok_master_list;
msg_producer::_p_args* jtest::p_args = NULL;
msg_consumer::_c_args* jtest::c_args = NULL;
-msg_producer jtest::_mp;
-msg_consumer jtest::_mc;
+msg_producer jtest::_mp(_dtok_master_list);
+msg_consumer jtest::_mc(_dtok_master_list);
rhm::journal::jcntl jtest::_jc(JOURNAL_ID, JOURNAL_DIR, JOURNAL_BASE_FILENAME);
// static method
Modified: store/trunk/cpp/tests/jrnl/jtest.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtest.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/jtest.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -39,9 +39,7 @@
#define JOURNAL_ID "Test Journal"
#define JOURNAL_DIR "jdata"
#define JOURNAL_BASE_FILENAME "test"
-//#define JRNL_TEST_CSV_FILE "test/wtests.csv"
-//#define NUM_TESTS 76
#define MAX_LINE_LEN 1024
using namespace std;
@@ -65,6 +63,7 @@
static msg_producer::_p_args* p_args;
static msg_consumer::_c_args* c_args;
static rhm::journal::jcntl _jc;
+ static std::deque<rhm::journal::data_tok*> _dtok_master_list;
struct timeval* _start_time;
struct timeval* _end_time;
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -46,7 +46,8 @@
_err(0)
{}
-msg_consumer::msg_consumer():
+msg_consumer::msg_consumer(std::deque<rhm::journal::data_tok*>& dtok_master_list):
+ _dtok_master_list(dtok_master_list),
_tot_dblks(0),
_tot_dsize(0)
{
@@ -70,13 +71,13 @@
msg_consumer::initialize(const u_int32_t numMsgs)
{
_num_msgs = numMsgs;
- for (u_int32_t i=0; i<_num_msgs; i++)
- {
- rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
- dtp->set_wstate(rhm::journal::data_tok::ENQ); // Assume all msgs enqueued OK
- _dtok_master_list.push_back(dtp);
-
- }
+// for (u_int32_t i=0; i<_num_msgs; i++)
+// {
+// rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+// dtp->set_wstate(rhm::journal::data_tok::ENQ); // Assume all msgs enqueued OK
+// _dtok_master_list.push_back(dtp);
+//
+// }
}
void
@@ -92,15 +93,15 @@
_tot_dblks = 0;
_tot_dsize = 0;
_aio_cmpl_dtok_list.clear();
- for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
- {
- if (_dtok_master_list[i])
- {
- delete _dtok_master_list[i];
- _dtok_master_list[i] = NULL;
- }
- }
- _dtok_master_list.clear();
+// for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
+// {
+// if (_dtok_master_list[i])
+// {
+// delete _dtok_master_list[i];
+// _dtok_master_list[i] = NULL;
+// }
+// }
+// _dtok_master_list.clear();
}
u_int32_t
@@ -113,62 +114,79 @@
for (msgCntr = 0; msgCntr < num_msgs && !_interrupt_flag; msgCntr++)
{
rhm::journal::data_tok* dtokp = _dtok_master_list[msgCntr];
+//std::cout << " R" << msgCntr << " " << std::flush;
unsigned aio_sleep_cnt = 0;
unsigned jempty_sleep_cnt = 0;
+ unsigned enq_wait_cnt = 0;
bool read = false;
while (!read)
{
- rhm::journal::iores res = _jcntl.read_data((void* const)_msg_buff, buffSize, dtokp);
- rhm::journal::data_tok::read_state rs = dtokp->rstate();
- rhm::journal::data_tok::write_state ws = dtokp->wstate();
- switch (res)
+ if (dtokp->wstate() >= rhm::journal::data_tok::ENQ)
{
- case rhm::journal::RHM_IORES_SUCCESS:
- if (rs != rhm::journal::data_tok::READ || ws != rhm::journal::data_tok::ENQ)
- std::cout << "###### Unexpected state: res=" << iores_str[res] << " ws=" <<
- dtokp->wstate_str() << "; rs=" << dtokp->rstate_str();
- data_size = dtokp->dsize();
- _msg_buff[data_size] = 0; // Make msg into a c-string
- read = true;
- _tot_dblks += dtokp->dblocks_proc();
- _tot_dsize += data_size;
+ rhm::journal::iores res = _jcntl.read_data((void* const)_msg_buff, buffSize, dtokp);
+ rhm::journal::data_tok::read_state rs = dtokp->rstate();
+ rhm::journal::data_tok::write_state ws = dtokp->wstate();
+ switch (res)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ if (rs != rhm::journal::data_tok::READ || ws != rhm::journal::data_tok::ENQ)
+ std::cout << "msg_consumer::consume(): Unexpected state: res=" <<
+ iores_str[res] << " ws=" << dtokp->wstate_str() << "; rs=" <<
+ dtokp->rstate_str();
+ data_size = dtokp->dsize();
+ _msg_buff[data_size] = 0; // Make msg into a c-string
+ read = true;
+ _tot_dblks += dtokp->dblocks_read();
+ _tot_dsize += data_size;
- // Comment out these for performance checks/measurements
- check_msg(msgCntr, data_size, min_msg_size, max_msg_size, true, _msg_buff);
- //print_dbug(msgCntr, data_size, _msg_buff, true);
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
+ // Comment out these for performance checks/measurements
+ check_msg(msgCntr, data_size, min_msg_size, max_msg_size, true, _msg_buff);
+ //print_dbug(msgCntr, data_size, _msg_buff, true);
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
//std::cout << "a" << std::flush;
- if (rs == rhm::journal::data_tok::READ ||
- ws != rhm::journal::data_tok::ENQ)
- std::cout << "###### Unexpected state: res=" << iores_str[res] << " ws=" <<
- dtokp->wstate_str() << "; rs=" << dtokp->rstate_str();
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- throw rhm::journal::jexception(EXCEPTION_BASE+0,
- "Page cache full (AIO events outstanding for all pages); "
- "exceeced wait time for pages to free.", "msg_consumer",
- "consume");
- usleep(AIO_SLEEP_TIME);
- aio_sleep_cnt++;
- break;
- case rhm::journal::RHM_IORES_EMPTY:
+ if (rs == rhm::journal::data_tok::READ ||
+ ws != rhm::journal::data_tok::ENQ)
+ std::cout << "msg_consumer::consume(): Unexpected state: res=" <<
+ iores_str[res] << " ws=" << dtokp->wstate_str() << "; rs=" <<
+ dtokp->rstate_str();
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
+ throw rhm::journal::jexception(EXCEPTION_BASE+0,
+ "Page cache full (AIO events outstanding for all pages); "
+ "exceeced wait time for pages to free.", "msg_consumer",
+ "consume");
+ usleep(AIO_SLEEP_TIME);
+ aio_sleep_cnt++;
+ break;
+ case rhm::journal::RHM_IORES_EMPTY:
//std::cout << "e" << std::flush;
- if (rs == rhm::journal::data_tok::READ ||
- ws != rhm::journal::data_tok::ENQ)
- std::cout << "Unexpected state: res=" << iores_str[res] << " ws=" <<
- dtokp->wstate_str() << "; rs=" << dtokp->rstate_str();
- if (jempty_sleep_cnt >= MAX_EMPTY_SLEEPS)
- throw rhm::journal::jexception(EXCEPTION_BASE+1,
- "Journal empty (no further data written to journal); "
- "exceeded wait time for journal to fill.", "msg_consumer",
- "consume");
- usleep(EMPTY_SLEEP_TIME);
- jempty_sleep_cnt++;
- break;
- default:
- std::cout << "###### Unexpected msg state: id=" << dtokp->id() << " ms=" <<
- dtokp->wstate_str() << " res=" << iores_str[res] << std::flush;
+ if (rs == rhm::journal::data_tok::READ ||
+ ws != rhm::journal::data_tok::ENQ)
+ std::cout << "msg_consumer::consume(): Unexpected state: res=" <<
+ iores_str[res] << " ws=" << dtokp->wstate_str() << "; rs=" <<
+ dtokp->rstate_str();
+ if (jempty_sleep_cnt >= MAX_EMPTY_SLEEPS)
+ throw rhm::journal::jexception(EXCEPTION_BASE+1,
+ "Journal empty (no further data written to journal); "
+ "exceeded wait time for journal to fill.", "msg_consumer",
+ "consume");
+ usleep(EMPTY_SLEEP_TIME);
+ jempty_sleep_cnt++;
+ break;
+ default:
+ std::cout << "msg_consumer::consume(): Unexpected msg state: id=" <<
+ dtokp->id() << " ms=" << dtokp->wstate_str() << " res=" <<
+ iores_str[res] << std::flush;
+ }
}
+ else
+ {
+ if (enq_wait_cnt > MAX_ENQ_WAIT_SLEEPS)
+ throw rhm::journal::jexception(EXCEPTION_BASE+2,
+ "Exceeded max. wait time for next read token to reach state ENQ.",
+ "msg_consumer", "consume");
+ usleep(ENQ_WAIT_SLEEP_TIME);
+ }
}
}
_interrupt_flag = false;
@@ -208,7 +226,7 @@
ss << " is valid, but exptected size must lie between " << min_msg_size;
ss << " and " << max_msg_size;
}
- throw rhm::journal::jexception(EXCEPTION_BASE+2, ss.str(), "msg_consumer", "check_msg");
+ throw rhm::journal::jexception(EXCEPTION_BASE+3, ss.str(), "msg_consumer", "check_msg");
}
if (chk_data)
{
@@ -220,7 +238,7 @@
std::stringstream ss;
ss << "Message " << msgCntr << " failed content check at char " << i;
ss << ": found \'" << buff[i] << "\'; expected \'" << expected << "\'";
- throw rhm::journal::jexception(EXCEPTION_BASE+3, ss.str(), "msg_consumer",
+ throw rhm::journal::jexception(EXCEPTION_BASE+4, ss.str(), "msg_consumer",
"check_msg");
}
}
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -34,9 +34,11 @@
#define MAX_AIO_SLEEPS 500
#define AIO_SLEEP_TIME 1000
+#define MAX_ENQ_WAIT_SLEEPS 1000
#define MAX_EMPTY_SLEEPS 1000
#define EMPTY_SLEEP_TIME 10000
+#define ENQ_WAIT_SLEEP_TIME 1000
class msg_consumer
{
@@ -60,13 +62,13 @@
static char _msg_buff[];
static bool _interrupt_flag;
u_int32_t _num_msgs;
- std::deque<rhm::journal::data_tok*> _dtok_master_list; // One dtok per msg to be received
+ std::deque<rhm::journal::data_tok*>& _dtok_master_list; // One dtok per msg to be received
std::deque<rhm::journal::data_tok*> _aio_cmpl_dtok_list; // Dtoks from completed AIOs go here
u_int64_t _tot_dblks;
u_int64_t _tot_dsize;
public:
- msg_consumer();
+ msg_consumer(std::deque<rhm::journal::data_tok*>& dtok_master_list);
~msg_consumer();
void initialize(const u_int32_t numMsgs);
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -45,13 +45,14 @@
_err(0)
{}
-msg_producer::msg_producer():
+msg_producer::msg_producer(std::deque<rhm::journal::data_tok*>& dtok_master_list):
_jcptr(NULL),
_num_msgs(0),
_num_msgs_enq_subm(0),
_num_msgs_enq(0),
_num_msgs_deq_subm(0),
_num_msgs_deq(0),
+ _dtok_master_list(dtok_master_list),
_tot_dblks(0),
_tot_dsize(0)
{
@@ -80,8 +81,8 @@
{
_num_msgs = numMsgs;
_auto_dequeue = auto_dequeue;
- for (u_int32_t i=0; i<_num_msgs; i++)
- _dtok_master_list.push_back(new rhm::journal::data_tok);
+// for (u_int32_t i=0; i<_num_msgs; i++)
+// _dtok_master_list.push_back(new rhm::journal::data_tok);
}
void
@@ -102,15 +103,15 @@
_tot_dsize = 0;
_aio_cmpl_dtok_list.clear();
_dd_dtok_list.clear();
- for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
- {
- if (_dtok_master_list[i])
- {
- delete _dtok_master_list[i];
- _dtok_master_list[i] = NULL;
- }
- }
- _dtok_master_list.clear();
+// for (u_int32_t i=0; i<_dtok_master_list.size(); i++)
+// {
+// if (_dtok_master_list[i])
+// {
+// delete _dtok_master_list[i];
+// _dtok_master_list[i] = NULL;
+// }
+// }
+// _dtok_master_list.clear();
}
u_int32_t
@@ -137,7 +138,7 @@
}
rhm::journal::data_tok* dtokp = _dtok_master_list[msgCntr];
const void* const msg = (char*)_msg_buff + (msgCntr%10);
-//std::cout << "e" << dtokp->id() << /*"-" << dtokp->wstate_str() <<*/ " " << std::flush;
+//std::cout << " E" << dtokp->id() << /*"-" << dtokp->wstate_str() <<*/ " " << std::flush;
bool written = false;
while (!written)
{
@@ -150,17 +151,19 @@
if (ws >= rhm::journal::data_tok::ENQ_SUBM)
{
written = true;
- _tot_dblks += dtokp->dblocks_proc();
+ _tot_dblks += dtokp->dblocks_written();
_tot_dsize += dtokp->dsize();
}
else
- std::cout << "Unexpected msg state: id=" << dtokp->id() << " ws=" <<
- wsstr << " eres=" << iores_str[eres] << std::flush;
+ std::cout << "msg_producer::produce() Unexpected msg state: id=" <<
+ dtokp->id() << " ws=" << wsstr << " eres=" << iores_str[eres] <<
+ std::flush;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
if (ws >= rhm::journal::data_tok::ENQ_SUBM)
- std::cout << "Unexpected msg state: id=" << dtokp->id() << " ws=" <<
- wsstr << " eres=" << iores_str[eres] << std::flush;
+ std::cout << "msg_producer::produce() Unexpected msg state: id=" <<
+ dtokp->id() << " ws=" << wsstr << " eres=" << iores_str[eres] <<
+ std::flush;
if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
throw rhm::journal::jexception(EXCEPTION_BASE+1,
"Page cache full (AIO events outstanding for all pages); "
@@ -173,8 +176,12 @@
break;
case rhm::journal::RHM_IORES_FULL:
if (ws >= rhm::journal::data_tok::ENQ_SUBM)
- std::cout << "Unexpected msg state: id=" << dtokp->id() << " ws=" <<
- wsstr << " eres=" << iores_str[eres] << std::flush;
+ {
+ written = true;
+ _tot_dblks += dtokp->dblocks_written();
+ _tot_dsize += dtokp->dsize();
+ std::cout << "WARNING: Journal full." << std::flush;
+ }
if (jfull_sleep_cnt >= MAX_FULL_SLEEPS)
throw rhm::journal::jexception(EXCEPTION_BASE+2,
"Journal full (next file to write still has undequeued data); "
@@ -185,8 +192,9 @@
jfull_sleep_cnt++;
break;
default:
- std::cout << "Unexpected msg state: id=" << dtokp->id() << " ws=" <<
- wsstr << " eres=" << iores_str[eres] << std::flush;
+ std::cout << "msg_producer::produce() Unexpected msg state: id=" <<
+ dtokp->id() << " ws=" << wsstr << " eres=" << iores_str[eres] <<
+ std::flush;
}
//print_dbug(msgCntr, size, (char*)msg, true);
@@ -195,7 +203,21 @@
// Submit deferred dequeues (if any)
if (_auto_dequeue)
+ {
+//std::cout << "auto_dequeue: " << _dd_dtok_list.size() << " dequeues waiting. " << std::flush;
+ u_int32_t cnt = 0;
send_deferred_dequeues(jc);
+ while (!_dd_dtok_list.empty() && !_interrupt_flag)
+ {
+ if (++cnt > 10000)
+ throw rhm::journal::jexception(EXCEPTION_BASE+3,
+ "Timeout waiting for all messages to be read.", "msg_producer",
+ "produce");
+ usleep(1000);
+ send_deferred_dequeues(jc);
+ }
+//std::cout << "auto_dequeue: done. " << std::flush;
+ }
}
jrnl_flush(jc, _num_msgs);
}
@@ -254,39 +276,55 @@
void
msg_producer::send_deferred_dequeues(rhm::journal::jcntl& jc)
{
- while (!_dd_dtok_list.empty())
+ std::deque<rhm::journal::data_tok*>::iterator ditr = _dd_dtok_list.begin();
+// while (!_dd_dtok_list.empty())
+ while (ditr != _dd_dtok_list.end())
{
- rhm::journal::data_tok* ddtokp = _dd_dtok_list.front();
-//std::cout << "d" << ddtokp->id() << " " << std::flush;
- unsigned aio_sleep_cnt = 0;
- bool written = false;
- while (!written)
+// rhm::journal::data_tok* ddtokp = _dd_dtok_list.front();
+ rhm::journal::data_tok* ddtokp = *ditr;
+
+#ifndef RHM_WRONLY
+ // Wait until data token is read before dequeueing
+ if (ddtokp->rstate() == rhm::journal::data_tok::READ)
+#endif
{
- rhm::journal::iores dres = _jcptr->dequeue_data(ddtokp);
- const char* wsstr = ddtokp->wstate_str();
- switch (dres)
+//std::cout << " D" << ddtokp->id() << " " << std::flush;
+ unsigned aio_sleep_cnt = 0;
+ bool written = false;
+ while (!written)
{
- case rhm::journal::RHM_IORES_SUCCESS:
- written = true;
- _num_msgs_deq_subm++;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
- throw rhm::journal::jexception(EXCEPTION_BASE+3,
- "Page cache full (AIO events outstanding for all pages); "
- "exceeced wait time for pages to free.", "msg_producer",
- "send_deferred_dequeues");
+ rhm::journal::iores dres = _jcptr->dequeue_data(ddtokp);
+ const char* wsstr = ddtokp->wstate_str();
+ switch (dres)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ written = true;
+ _num_msgs_deq_subm++;
+ break;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+ if (aio_sleep_cnt >= MAX_AIO_SLEEPS)
+ throw rhm::journal::jexception(EXCEPTION_BASE+4,
+ "Page cache full (AIO events outstanding for all pages); "
+ "exceeced wait time for pages to free.", "msg_producer",
+ "send_deferred_dequeues");
//std::cout << "$" << dres << " " << std::flush;
- jc.get_wr_events();
- usleep(AIO_SLEEP_TIME);
- aio_sleep_cnt++;
- break;
- default:
- std::cout << "Unexpected msg state: id=" << ddtokp->id() << " ws=" <<
- wsstr << " dres=" << iores_str[dres] << std::flush;
+ jc.get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ aio_sleep_cnt++;
+ break;
+ default:
+ std::cout << "msg_producer::send_deferred_dequeues()"
+ " Unexpected msg state: id=" << ddtokp->id() << " ws=" << wsstr <<
+ " dres=" << iores_str[dres] << std::flush;
+ }
}
+// _dd_dtok_list.pop_front();
+ ditr = _dd_dtok_list.erase(ditr);
}
- _dd_dtok_list.pop_front();
+#ifndef RHM_WRONLY
+ else
+ ditr++;
+#endif
}
}
@@ -305,7 +343,7 @@
std::stringstream ss;
ss << "Journal flush phase 1 failed, _num_msgs_enq=" << _num_msgs_enq;
ss << " num_msgs_sent=" << num_msgs_sent;
- throw rhm::journal::jexception(EXCEPTION_BASE+4, ss.str(), "msg_producer",
+ throw rhm::journal::jexception(EXCEPTION_BASE+5, ss.str(), "msg_producer",
"jrnl_flush");
}
//std::cout << "+" << std::flush;
@@ -318,7 +356,19 @@
// Submit deferred dequeues (if any)
if (_auto_dequeue)
{
+//std::cout << "auto_dequeue: " << _dd_dtok_list.size() << " dequeues waiting. " << std::flush;
+ u_int32_t cnt = 0;
send_deferred_dequeues(jc);
+ while (!_dd_dtok_list.empty() && !_interrupt_flag)
+ {
+ if (++cnt > 10000)
+ throw rhm::journal::jexception(EXCEPTION_BASE+6,
+ "Timeout waiting for all messages to be read.", "msg_producer",
+ "jrnl_flush");
+ usleep(1000);
+ send_deferred_dequeues(jc);
+ }
+//std::cout << "auto_dequeue: done. " << std::flush;
// Clear any unsent dequeues in buffer, then wait for all dequeues to return
flush_cnt = 0;
@@ -332,7 +382,7 @@
std::stringstream ss;
ss << "Journal flush phase 2 failed, _num_msgs_deq=" << _num_msgs_deq;
ss << " num_msgs_sent=" << num_msgs_sent;
- throw rhm::journal::jexception(EXCEPTION_BASE+5, ss.str(), "msg_producer",
+ throw rhm::journal::jexception(EXCEPTION_BASE+7, ss.str(), "msg_producer",
"jrnl_flush");
}
//std::cout << "*" << std::flush;
Modified: store/trunk/cpp/tests/jrnl/msg_producer.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/msg_producer.hpp 2007-09-14 17:10:49 UTC (rev 921)
@@ -69,14 +69,14 @@
u_int32_t _num_msgs_enq;
u_int32_t _num_msgs_deq_subm;
u_int32_t _num_msgs_deq;
- std::deque<rhm::journal::data_tok*> _dtok_master_list; // One dtok per msg to be sent
+ std::deque<rhm::journal::data_tok*>& _dtok_master_list; // One dtok per msg to be sent
std::deque<rhm::journal::data_tok*> _aio_cmpl_dtok_list; // Dtoks from completed AIOs go here
std::deque<rhm::journal::data_tok*> _dd_dtok_list; // Deferred dequeues
u_int64_t _tot_dblks;
u_int64_t _tot_dsize;
public:
- msg_producer();
+ msg_producer(std::deque<rhm::journal::data_tok*>& dtok_master_list);
~msg_producer();
void initialize(const u_int32_t numMsgs, bool auto_dequeue);
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-09-14 17:10:49 UTC (rev 921)
@@ -35,7 +35,7 @@
W_DO_TEST=T
W_TEST_FILE=wtests.csv
W_TEST_START=0
-W_TEST_STOP=81
+W_TEST_STOP=67
W_NITER=5
# Read test
@@ -49,14 +49,14 @@
RW_DO_TEST=T
RW_TEST_FILE=rwtests.csv
RW_TEST_START=0
-RW_TEST_STOP=1
-RW_NITER=1
+RW_TEST_STOP=49
+RW_NITER=5
RM=rm
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./ftest.py"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"
@@ -386,6 +386,7 @@
# To generate journals for reading, we must first compile in write mode
build jwtest
+ echo -n "Creating test journal archives: "
if [[ -d ${TAR_DIR} ]]; then
rm -rf ${TAR_DIR}/*
else
@@ -400,7 +401,6 @@
local jfiles=${jfiles}" "${FILENAME}
done
- echo -n "Creating test journal archives: "
for (( f=${test_start}; f<=${test_stop}; f++ )); do
${MAKE} clean-data
run_test jrtest $f 0 ${test_file}
Modified: store/trunk/cpp/tests/jrnl/rwtests.csv
===================================================================
--- store/trunk/cpp/tests/jrnl/rwtests.csv 2007-09-14 15:13:38 UTC (rev 920)
+++ store/trunk/cpp/tests/jrnl/rwtests.csv 2007-09-14 17:10:49 UTC (rev 921)
@@ -41,27 +41,28 @@
,,,,,,
"File rollover (from file 0007 to 0000) - RHM_WRONLY req'd for auto-dequeue == FALSE",,,,,,
27,16,786396,786396,FALSE,,"16 * (24 pages = ½ file); Total = 8 files exactly"
-28,17,786396,786396,FALSE,,"17 * (24 pages = ½ file); Total = 8 files + file 0 overwritten by ½ file"
-29,16,786397,786397,FALSE,,"16 * (24 pages + 1 byte); Total = 8 files + file 0 overwritten by 16 sblks"
-30,32,786396,786396,FALSE,,"32 * (24 pages = ½ file); Total = 16 files exactly, all files overwritten once"
-31,33,786396,786396,FALSE,,"33 * (24 pages = ½ file); Total = 16 ½ files, all files overwritten once + file 0 overwritten again by ½ file"
-32,32,786397,786397,FALSE,,"32 * (24 pages + 1 byte); All files overwritten once + file 0 overwritten again by 32 sblks"
+28,16,786268,786268,TRUE,,"16 * (24 pages = ½ file); Total = 8 files exactly"
+29,17,786268,786268,TRUE,,"17 * (24 pages = ½ file); Total = 8 files + file 0 overwritten by ½ file"
+30,16,786269,786269,TRUE,,"16 * (24 pages + 1 byte); Total = 8 files + file 0 overwritten by 16 sblks"
+31,32,786268,786268,TRUE,,"32 * (24 pages = ½ file); Total = 16 files exactly, all files overwritten once"
+32,33,786268,786268,TRUE,,"33 * (24 pages = ½ file); Total = 16 ½ files, all files overwritten once + file 0 overwritten again by ½ file"
+33,32,786269,786269,TRUE,,"32 * (24 pages + 1 byte); All files overwritten once + file 0 overwritten again by 32 sblks"
,,,,,,
"Multi-page messages (large messages) - tests various paths in encoder; no dequeues required to test this functionality.",,,,,,
-33,16,32732,32732,FALSE,,"16 * (1 page exact fit)"
-34,16,32733,32733,FALSE,,"16 * (1 page + 1 byte): tail split"
-35,16,32743,32743,FALSE,,"16 * (1 page + 11 bytes): tail split"
-36,16,32744,32744,FALSE,,"16 * (1 page + 12 bytes): tail separated exactly"
-37,16,32745,32745,FALSE,,"16 * (1 page + 13 bytes): data split"
-38,16,65500,65500,FALSE,,"16 * (2 pages exact fit)"
-39,16,65501,65501,FALSE,,"16 * (2 pages + 1 byte): tail split"
-40,16,65511,65511,FALSE,,"16 * (2 pages + 11 bytes): tail split"
-41,16,65512,65512,FALSE,,"16 * (2 pages + 12 bytes): tail separated exactly"
-42,16,65513,65513,FALSE,,"16 * (2 pages + 13 bytes) data split"
-43,16,131036,131036,FALSE,,"16 * (4 pages exact fit)"
-44,16,131037,131037,FALSE,,"16 * (4 pages + 1 byte: tail split)"
-45,16,131047,131047,FALSE,,"16 * (4 pages + 1 byte: tail split)"
-46,16,131048,131048,FALSE,,"16 * (4 pages + 12 bytes: tail separated)"
-47,16,131049,131049,FALSE,,"16 * (4 pages + 13 bytes: data split)"
-48,16,114652,114652,FALSE,,"16 * (3.5 pages)"
-49,16,114653,114653,FALSE,,"16 * (3.5 pages + 1 byte)"
+34,16,32732,32732,FALSE,,"16 * (1 page exact fit)"
+35,16,32733,32733,FALSE,,"16 * (1 page + 1 byte): tail split"
+36,16,32743,32743,FALSE,,"16 * (1 page + 11 bytes): tail split"
+37,16,32744,32744,FALSE,,"16 * (1 page + 12 bytes): tail separated exactly"
+38,16,32745,32745,FALSE,,"16 * (1 page + 13 bytes): data split"
+39,16,65500,65500,FALSE,,"16 * (2 pages exact fit)"
+40,16,65501,65501,FALSE,,"16 * (2 pages + 1 byte): tail split"
+41,16,65511,65511,FALSE,,"16 * (2 pages + 11 bytes): tail split"
+42,16,65512,65512,FALSE,,"16 * (2 pages + 12 bytes): tail separated exactly"
+43,16,65513,65513,FALSE,,"16 * (2 pages + 13 bytes) data split"
+44,16,131036,131036,FALSE,,"16 * (4 pages exact fit)"
+45,16,131037,131037,FALSE,,"16 * (4 pages + 1 byte: tail split)"
+46,16,131047,131047,FALSE,,"16 * (4 pages + 1 byte: tail split)"
+47,16,131048,131048,FALSE,,"16 * (4 pages + 12 bytes: tail separated)"
+48,16,131049,131049,FALSE,,"16 * (4 pages + 13 bytes: data split)"
+49,16,114652,114652,FALSE,,"16 * (3.5 pages)"
+50,16,114653,114653,FALSE,,"16 * (3.5 pages + 1 byte)"
Modified: store/trunk/cpp/tests/jrnl/tests.ods
===================================================================
(Binary files differ)
16 years, 9 months
rhmessaging commits: r920 - in store/trunk/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-09-14 11:13:38 -0400 (Fri, 14 Sep 2007)
New Revision: 920
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/tests/persistence.py
Log:
Updated python test to allocate credit as subscriptions now start with none.
Fixed enqueue to set the async completion for more than the first queue.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-10 20:24:55 UTC (rev 919)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-14 15:13:38 UTC (rev 920)
@@ -622,9 +622,10 @@
messageId = messageIdSequence.next();
store(&queue, txn->get(), key, msg);
msg.setPersistenceId(messageId);
- if (!usingJrnl())
- msg.enqueueComplete(); // set enqueued for ack
}
+ if (!usingJrnl()) {
+ msg.enqueueComplete(); // set enqueued for ack
+ }
if (!usingJrnl())
put(mappingDb, txn->get(), key, value);
Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py 2007-09-10 20:24:55 UTC (rev 919)
+++ store/trunk/cpp/tests/persistence.py 2007-09-14 15:13:38 UTC (rev 920)
@@ -117,6 +117,8 @@
channel.queue_delete(queue="queue-c")
channel.message_subscribe(destination="ctag", queue="queue-a")
+ channel.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
+ channel.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
included = self.client.queue("ctag")
msg = included.get(timeout=1)
self.assertExpectedContent(msg, "Msg0004", "A_Message3")
16 years, 9 months
rhmessaging commits: r919 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-10 16:24:55 -0400 (Mon, 10 Sep 2007)
New Revision: 919
Modified:
store/trunk/cpp/lib/jrnl/data_tok.cpp
store/trunk/cpp/lib/jrnl/data_tok.hpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
- general integration code
- commented out exception in rid() of dtok
Modified: store/trunk/cpp/lib/jrnl/data_tok.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-09 19:42:12 UTC (rev 918)
+++ store/trunk/cpp/lib/jrnl/data_tok.cpp 2007-09-10 20:24:55 UTC (rev 919)
@@ -125,9 +125,9 @@
const u_int64_t
data_tok::rid() const throw (jexception)
{
- if (_wstate == NONE)
+/* if (_wstate == NONE)
throw new jexception(jerrno::JERR_DTOK_RIDNOTSET,
- "Instance in write state NONE; rid not known.", "data_tok","rid");
+ "Instance in write state NONE; rid not known.", "data_tok","rid"); */
return _rid;
}
Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-09 19:42:12 UTC (rev 918)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp 2007-09-10 20:24:55 UTC (rev 919)
@@ -93,6 +93,7 @@
size_t _dsize; ///< Data size in bytes
u_int32_t _dblks_proc; ///< Data blocks read/written
u_int64_t _rid; ///< RID of data set by enqueue operation
+ u_int64_t _dequeue_rid; ///< RID of data set by dequeue operation
qpid::broker::PersistableMessage* _sourceMsg; ///< Pointer back to source Message in Broker
public:
@@ -124,6 +125,8 @@
inline void reset_dblks_proc() { _dblks_proc = 0; }
const u_int64_t rid() const throw (jexception);
inline void set_rid(const u_int64_t rid) { _rid = rid; }
+ inline const u_int64_t dequeue_rid() const throw (jexception) {return _dequeue_rid; }
+ inline void set_dequeue_rid(const u_int64_t rid) { _dequeue_rid = rid; }
void reset();
};
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-09 19:42:12 UTC (rev 918)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-10 20:24:55 UTC (rev 919)
@@ -146,7 +146,7 @@
/**
* \brief Destructor.
*/
- ~jcntl();
+ ~jcntl();
/**
* \brief Initialize the journal for storing data.
@@ -173,6 +173,17 @@
initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list, &aio_wr_callback );
}
+
+ /// replace with real code to recover
+// void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
+// std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+
+
+ void recover() {
+ initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list, &aio_wr_callback );
+ }
+
+
/**
* \brief Delete the journal directory of files matching the base filename
* by moving them into a subdirectory
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-09 19:42:12 UTC (rev 918)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-09-10 20:24:55 UTC (rev 919)
@@ -107,7 +107,10 @@
u_int64_t rid;
if (dtok->getSourceMessage())
+ {
rid = dtok->rid();
+ assert(rid != 0);
+ }
else
rid = cont ? _wrfc.rid() - 1 : _wrfc.get_incr_rid();
@@ -211,7 +214,16 @@
if (res != RHM_IORES_SUCCESS)
return res;
- u_int64_t rid = _wrfc.get_incr_rid();
+ u_int64_t rid;
+ if (dtok->getSourceMessage())
+ {
+ rid = dtok->dequeue_rid();
+ assert(rid != 0);
+ }
+ else
+ rid = _wrfc.get_incr_rid();
+
+
//***
// NOTE: ASSUMPTION: sizeof(deq_hdr) <= JRNL_DBLK_SIZE
// This encoding is a simplification: it assumes deq_hdr (currently 20 bytes) fits inside
16 years, 9 months
rhmessaging commits: r918 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-09 15:42:12 -0400 (Sun, 09 Sep 2007)
New Revision: 918
Modified:
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jdir.hpp
Log:
doc corrections
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-09 18:16:07 UTC (rev 917)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-09 19:42:12 UTC (rev 918)
@@ -157,8 +157,8 @@
* <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
* the data from an existing journal, use recover().
*
- * <b>NOTE: if NULL is passed to the deque's they are created internally and deleted intenally
- * <b>NOTE: if NULL is passed to the callbacks internal ones will be used.
+ * <b>NOTE: if NULL is passed to the deque's they are created internally and deleted intenally </b>
+ * <b>NOTE: if NULL is passed to the callbacks internal ones will be used. </b>
*
* \param rd_dtokl deque for storing data tokens retruning from read AIO operations.
* \param rd_cb Function pointer to callback function for read operations. May be NULL.
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-09 18:16:07 UTC (rev 917)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-09 19:42:12 UTC (rev 918)
@@ -74,7 +74,7 @@
* \brief Create journal directory as set in the dirname parameter of the constructor.
* Recursive creation is supported.
*
- * \expeption ??
+ * \exception ??
*/
void create_dir() throw (jexception);
@@ -83,7 +83,7 @@
*
* \param dirname C-string containing name of directory.
*
- * \expeption ??
+ * \exception ??
*/
static void create_dir(const char* dirname) throw (jexception);
@@ -92,7 +92,7 @@
*
* \param dirname String containing name of directory.
*
- * \expeption ??
+ * \exception ??
*/
static void create_dir(const std::string& dirname) throw (jexception);
16 years, 9 months
rhmessaging commits: r917 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-09 14:16:07 -0400 (Sun, 09 Sep 2007)
New Revision: 917
Modified:
store/trunk/cpp/lib/jrnl/jdir.cpp
store/trunk/cpp/lib/jrnl/jdir.hpp
Log:
added option to delete, to only delete sub-dirs
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-09 02:02:30 UTC (rev 916)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-09 18:16:07 UTC (rev 917)
@@ -174,19 +174,19 @@
// === delete_dir ===
void
-jdir::delete_dir() throw (jexception)
+jdir::delete_dir(bool children_only) throw (jexception)
{
- delete_dir(_dirname);
+ delete_dir(_dirname, children_only);
}
void
-jdir::delete_dir(const char* dirname) throw (jexception)
+jdir::delete_dir(const char* dirname, bool children_only) throw (jexception)
{
- delete_dir(std::string(dirname));
+ delete_dir(std::string(dirname), children_only);
}
void
-jdir::delete_dir(const std::string& dirname) throw (jexception)
+jdir::delete_dir(const std::string& dirname, bool children_only) throw (jexception)
{
struct dirent* entry;
struct stat s;
@@ -253,12 +253,14 @@
ss << "dir=\"" << dirname << "\" errno=" << errno;
throw jexception(jerrno::JERR_JDIR_CLOSEDIR, ss.str(), "jdir", "delete_dir");
}
- if (::rmdir(dirname.c_str()))
- {
- std::stringstream ss;
- ss << "dir=\"" << dirname << "\" errno=" << errno;
- throw jexception(jerrno::JERR_JDIR_RMDIR, ss.str(), "jdir", "delete_dir");
- }
+
+ if (!children_only)
+ if (::rmdir(dirname.c_str()))
+ {
+ std::stringstream ss;
+ ss << "dir=\"" << dirname << "\" errno=" << errno;
+ throw jexception(jerrno::JERR_JDIR_RMDIR, ss.str(), "jdir", "delete_dir");
+ }
}
Modified: store/trunk/cpp/lib/jrnl/jdir.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-09 02:02:30 UTC (rev 916)
+++ store/trunk/cpp/lib/jrnl/jdir.hpp 2007-09-09 18:16:07 UTC (rev 917)
@@ -153,13 +153,15 @@
* contain. This is equivilent of rm -rf.
*
* FIXME: links are not handled correctly.
+ *
+ * \param children_only To only delete children.
*
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
* directory failed.
* \exception The directory handle could not be closed.
*/
- void delete_dir() throw (jexception);
+ void delete_dir(bool children_only = false ) throw (jexception);
/**
* \brief Delete the journal directory and all files and sub--directories that it may
@@ -168,13 +170,14 @@
* FIXME: links are not handled correctly.
*
* \param dirname C-string containing name of directory to be deleted.
+ * \param children_only To only delete children.
*
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
* directory failed.
* \exception The directory handle could not be closed.
*/
- static void delete_dir(const char* dirname) throw (jexception);
+ static void delete_dir(const char* dirname, bool children_only = false) throw (jexception);
/**
* \brief Delete the journal directory and all files and sub--directories that it may
@@ -183,13 +186,14 @@
* FIXME: links are not handled correctly.
*
* \param dirname String containing name of directory to be deleted.
+ * \param children_only To only delete children.
*
* \exception The journal directory could not be opened.
* \exception The move of files from the journal directory to the created backup
* directory failed.
* \exception The directory handle could not be closed.
*/
- static void delete_dir(const std::string& dirname) throw (jexception);
+ static void delete_dir(const std::string& dirname, bool children_only = false) throw (jexception);
/**
* \brief Create bakup directory that is next in sequence and move all journal files
16 years, 9 months
rhmessaging commits: r916 - store/trunk/cpp/tests.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-08 22:02:30 -0400 (Sat, 08 Sep 2007)
New Revision: 916
Modified:
store/trunk/cpp/tests/SimpleTest.cpp
Log:
- added journal create / delete test
- need some more work to check dirs/ notes in test
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-09-09 02:00:50 UTC (rev 915)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-09-09 02:02:30 UTC (rev 916)
@@ -70,6 +70,7 @@
class SimpleTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(SimpleTest);
+ CPPUNIT_TEST(testCreateDelete);
CPPUNIT_TEST(testEmptyRecover);
CPPUNIT_TEST(testQueueCreate);
CPPUNIT_TEST(testQueueCreateWithSettings);
@@ -114,6 +115,21 @@
//nothing to assert, just testing it doesn't blow up
}
+ void testCreateDelete()
+ {
+ BdbMessageStore store;
+ store.truncate();//make sure it is empty to begin with
+ string name("CreateDeleteQueue");
+ Queue queue(name, 0, &store, 0);
+ store.create(queue);
+ // TODO - check dir exists
+ CPPUNIT_ASSERT(queue.getPersistenceId());
+ store.destroy(queue);
+ // TODO - check dir is deleted
+
+ }
+
+
void testQueueCreate()
{
uint64_t id(0);
16 years, 9 months
rhmessaging commits: r915 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-09-08 22:00:50 -0400 (Sat, 08 Sep 2007)
New Revision: 915
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jdir.cpp
Log:
- comments
- errno commented
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-08 16:06:25 UTC (rev 914)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-09 02:00:50 UTC (rev 915)
@@ -208,6 +208,7 @@
{
//kpvdr TODO -- can we get rid of the copy???
+// need to delete the dtok's
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
journal->_aio_wr_cmpl_dtok_list.end());
journal->_aio_wr_cmpl_dtok_list.clear();
@@ -232,6 +233,7 @@
jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
{
//kpvdr TODO -- can we get rid of the copy???
+// need to delete the dtok's
std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
journal->_aio_rd_cmpl_dtok_list.end());
journal->_aio_rd_cmpl_dtok_list.clear();
Modified: store/trunk/cpp/lib/jrnl/jdir.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-08 16:06:25 UTC (rev 914)
+++ store/trunk/cpp/lib/jrnl/jdir.cpp 2007-09-09 02:00:50 UTC (rev 915)
@@ -154,14 +154,14 @@
}
}
}
- if (errno)
+/* if (errno)
{
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
::closedir(dir); // Try to close, it makes no sense to trap errors here...
throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
}
-#endif
+ */#endif
if (::closedir(dir))
{
std::stringstream ss;
@@ -193,6 +193,9 @@
DIR* dir = ::opendir(dirname.c_str());
if (!dir)
{
+ if (errno == ENOENT) // dir does not exist.
+ return;
+
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
throw jexception(jerrno::JERR_JDIR_OPENDIR, ss.str(), "jdir", "delete_dir");
@@ -235,14 +238,14 @@
}
}
}
- if (errno)
+/* if (errno)
{
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
::closedir(dir); // Try to close, it makes no sense to trap errors here...
throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "delete_dir");
}
- }
+ */ }
// Now dir is empty, close and delete it
if (::closedir(dir))
{
@@ -290,14 +293,14 @@
}
}
}
- if (errno)
+/* if (errno)
{
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
::closedir(dir); // Try to close, it makes no sense to trap errors here...
throw jexception(jerrno::JERR_JDIR_READDIR, ss.str(), "jdir", "clear_dir");
}
- if (::closedir(dir))
+ */ if (::closedir(dir))
{
std::stringstream ss;
ss << "dir=\"" << dirname << "\" errno=" << errno;
16 years, 9 months