[rhmessaging-commits] rhmessaging commits: r1477 - in store/trunk/cpp: lib/jrnl and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Thu Dec 13 12:19:52 EST 2007
Author: kpvdr
Date: 2007-12-13 12:19:52 -0500 (Thu, 13 Dec 2007)
New Revision: 1477
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
Log:
Code tidy-up: separation of qpid dependencies from class jcntl by moving them to subclass JournalImpl.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -296,3 +296,62 @@
return r;
}
+void
+JournalImpl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+ JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- this list needs to be mutexed...???
+ std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_wr_cmpl_dtok_list.begin(),
+ jip->_aio_wr_cmpl_dtok_list.end());
+
+ jip->_aio_wr_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ switch (dtokp->wstate())
+ {
+ case data_tok::ENQ:
+ dtokp->getSourceMessage()->enqueueComplete();
+ break;
+ case data_tok::DEQ:
+/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
+ dtokp->getSourceMessage()->dequeueComplete();
+ if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
+ dtokp->getSourceMessage()->setPersistenceId(0);
+*/
+ break;
+ default:
+ ;
+ }
+ }
+ dtokp->release();
+ this_dtok_list.pop_front();
+ }
+}
+
+void
+JournalImpl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
+{
+ JournalImpl* jip = static_cast<JournalImpl*>(journal);
+//kpvdr TODO -- can we get rid of the copy???
+ std::deque<rhm::journal::data_tok*> this_dtok_list(jip->_aio_rd_cmpl_dtok_list.begin(),
+ jip->_aio_rd_cmpl_dtok_list.end());
+ jip->_aio_rd_cmpl_dtok_list.clear();
+ for (u_int32_t i=0; i<num_dtoks; i++)
+ {
+ data_tok*& dtokp = this_dtok_list.front();
+ if (!journal->is_stopped() && dtokp->getSourceMessage())
+ {
+ if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
+ {
+ // cct call the recovery manager. / lazyload..
+ }
+ }
+ dtokp->release();
+ this_dtok_list.pop_front();
+ }
+
+}
+
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-13 17:19:52 UTC (rev 1477)
@@ -81,6 +81,9 @@
journal::data_tok _dtok;
bool _external;
+ std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
+ std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
+
public:
JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -91,14 +94,18 @@
const qpid::sys::Duration flushTimeout);
virtual ~JournalImpl();
+ inline void initialize() {
+ jcntl::initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
+ &aio_wr_callback );
+ }
+
void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
u_int64_t& highest_rid, u_int64_t queue_id);
- void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
- u_int64_t& highest_rid, u_int64_t queue_id)
- {
+ inline void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t& highest_rid, u_int64_t queue_id) {
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, highest_rid, queue_id);
}
@@ -144,6 +151,8 @@
private:
const journal::iores handleInactivityTimer(const journal::iores r);
+ static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
+ static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
}; // class JournalImpl
} // namespace bdbstore
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -41,7 +41,6 @@
#include <jrnl/jerrno.hpp>
#include <jrnl/jinf.hpp>
#include <sstream>
-#include <qpid/broker/PersistableMessage.h>
#include <unistd.h>
namespace rhm
@@ -738,64 +737,5 @@
}
}
-
-void
-jcntl::aio_wr_callback(jcntl* journal, u_int32_t num_dtoks)
-{
-//kpvdr TODO -- this list needs to be mutexed...???
- std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_wr_cmpl_dtok_list.begin(),
- journal->_aio_wr_cmpl_dtok_list.end());
-
- journal->_aio_wr_cmpl_dtok_list.clear();
- for (u_int32_t i=0; i<num_dtoks; i++)
- {
- data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- switch (dtokp->wstate())
- {
- case data_tok::ENQ:
- dtokp->getSourceMessage()->enqueueComplete();
- break;
- case data_tok::DEQ:
-/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
- dtokp->getSourceMessage()->dequeueComplete();
- if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
- dtokp->getSourceMessage()->setPersistenceId(0);
-*/
- break;
- default:
- ;
- }
- }
- dtokp->release();
- this_dtok_list.pop_front();
- }
-}
-
-void
-jcntl::aio_rd_callback(jcntl* journal, u_int32_t num_dtoks)
-{
-
-//kpvdr TODO -- can we get rid of the copy???
- std::deque<rhm::journal::data_tok*> this_dtok_list(journal->_aio_rd_cmpl_dtok_list.begin(),
- journal->_aio_rd_cmpl_dtok_list.end());
- journal->_aio_rd_cmpl_dtok_list.clear();
- for (u_int32_t i=0; i<num_dtoks; i++)
- {
- data_tok*& dtokp = this_dtok_list.front();
- if (!journal->is_stopped() && dtokp->getSourceMessage())
- {
- if (dtokp->wstate() == data_tok::ENQ && dtokp->rstate() == data_tok::READ)
- {
- // cct call the recovery manager. / lazyload..
- }
- }
- dtokp->release();
- this_dtok_list.pop_front();
- }
-
-}
-
} // namespace journal
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -49,7 +49,6 @@
#include <jrnl/slock.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
-#include <qpid/broker/PersistableQueue.h>
namespace rhm
{
@@ -143,9 +142,6 @@
rcvdat _rcvdat; ///< Recovery data used for recovery
pthread_mutex_t _mutex; ///< Mutex for thread safety
- std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///< Internally mamanged deque
- std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///< Internally mamanged deque
-
public:
/**
* \brief Journal constructor.
@@ -193,19 +189,6 @@
std::deque<data_tok*>* wdtoklp, const aio_cb wr_cb);
/**
- * \brief Initialize using internal default callbacks and data_tok lists.
- *
- * TODO: Move to JournalImpl later
- *
- * \exception TODO
- */
- void initialize()
- {
- initialize(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback );
- }
-
- /**
* /brief Initialize journal by recovering state from previously written journal.
*
* Initialize journal by recovering state from previously written journal. The journal files
@@ -235,19 +218,6 @@
const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid);
/**
- * \brief Recover using internal default callbacks and data_tok lists.
- *
- * TODO: Move to JournalImpl later
- *
- * \exception TODO
- */
- void recover(const std::vector<std::string>& prep_txn_list, u_int64_t& highest_rid)
- {
- recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_txn_list, highest_rid);
- }
-
- /**
* \brief Notification to the journal that recovery is complete and that normal operation
* may resume.
*
@@ -656,16 +626,6 @@
std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
-
- /**
- * Intenal callback write
- */
- static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
-
- /**
- * Intenal callback write
- */
- static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
};
} // namespace journal
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -30,9 +30,6 @@
*/
#include "JournalSystemTests.hpp"
-//#include "msg_producer.hpp"
-//#include "msg_consumer.hpp"
-#include <vector>
#define NUM_MSGS 5
#define MAX_AIO_SLEEPS 500
@@ -67,7 +64,7 @@
{
char* test_name = "InitializationTest";
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
}
catch (const rhm::journal::jexception& e)
{
@@ -87,15 +84,15 @@
char* test_name = "EmptyRecoverTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
jc.recover_complete();
}
}
@@ -114,7 +111,7 @@
{
char* test_name = "EnqueueTest";
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Non-txn
for (int m=0; m<NUM_MSGS; m++)
@@ -145,13 +142,13 @@
char* test_name = "RecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -165,7 +162,7 @@
test_name = "TxnRecoverReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 1, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -174,7 +171,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -203,13 +200,13 @@
char* test_name = "RecoveredReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -231,7 +228,7 @@
test_name = "TxnRecoveredReadTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 2, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -240,7 +237,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -277,13 +274,13 @@
char* test_name = "RecoveredDequeueTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), false);
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -307,7 +304,7 @@
test_name = "TxnRecoveredDequeueTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 3, XID_SIZE);
txn_list.push_back(xid);
for (int m=0; m<NUM_MSGS; m++)
@@ -316,7 +313,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
for (int m=0; m<NUM_MSGS; m++)
{
read_msg(&jc);
@@ -355,7 +352,7 @@
char* test_name = "FlagsRecoverdTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Transient msgs - should not recover
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE), true);
@@ -371,7 +368,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -424,7 +421,7 @@
test_name = "TxnFlagsRecoverdTest";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
create_xid(xid, 4, XID_SIZE);
txn_list.push_back(xid);
// Transient msgs - should not recover
@@ -443,7 +440,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Recover non-transient msgs
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
{
@@ -511,7 +508,7 @@
char* test_name = "ComplexRecoveryTest1";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
// rids: 0 to NUM_MSGS*2 - 1
@@ -531,7 +528,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -568,7 +565,7 @@
test_name = "TxnComplexRecoveryTest1";
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.initialize();
+ jrnl_init(&jc);
// Enqueue 2n, then dequeue first n msgs; check that only last n readable
// rids: 0 to NUM_MSGS - 1
@@ -595,7 +592,7 @@
}
{
rhm::journal::jcntl jc(test_name, "jdata", test_name, NUM_JFILES, JFSIZE_SBLKS);
- jc.recover(txn_list, highest_rid);
+ jrnl_recover(&jc, txn_list, highest_rid);
// Check that only last n readable (as before)
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
@@ -648,6 +645,19 @@
// === Private helper functions ===
void
+JournalSystemTests::jrnl_init(rhm::journal::jcntl* jc)
+{
+ jc->initialize(&aioRdCmplList, NULL, &aioWrCmplList, NULL);
+}
+
+void
+JournalSystemTests::jrnl_recover(rhm::journal::jcntl* jc, vector<string> txn_list,
+ u_int64_t& highest_rid)
+{
+ jc->recover(&aioRdCmplList, NULL, &aioWrCmplList, NULL, txn_list, highest_rid);
+}
+
+void
JournalSystemTests::enq_msg(rhm::journal::jcntl* jc, const string msg, const bool transient)
{
rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp 2007-12-13 13:32:57 UTC (rev 1476)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.hpp 2007-12-13 17:19:52 UTC (rev 1477)
@@ -32,6 +32,7 @@
#include "../test_plugin.h"
#include <jrnl/jcntl.hpp>
+#include <vector>
class JournalSystemTests : public CppUnit::TestCase
{
@@ -55,6 +56,8 @@
size_t xidsize;
bool transientFlag;
bool externalFlag;
+ std::deque<rhm::journal::data_tok*> aioRdCmplList;
+ std::deque<rhm::journal::data_tok*> aioWrCmplList;
public:
void InstantiationTest();
@@ -68,6 +71,9 @@
void ComplexRecoveryTest1();
private:
+ void jrnl_init(rhm::journal::jcntl* jc);
+ void jrnl_recover(rhm::journal::jcntl* jc, std::vector<std::string> txn_list,
+ u_int64_t& highest_rid);
void enq_msg(rhm::journal::jcntl* jc, const std::string msg, const bool transient);
void enq_extern_msg(rhm::journal::jcntl* jc, const bool transient);
void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid,
More information about the rhmessaging-commits
mailing list