Author: kpvdr
Date: 2007-11-02 09:30:43 -0400 (Fri, 02 Nov 2007)
New Revision: 1209
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jerrno.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/rtest
Log:
Created timer in JournalImpl that will keep calling get_events until the number of
outstanding AIO events is 0.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -44,6 +44,7 @@
static const u_int8_t MESSAGE_MESSAGE = 1;
static const u_int8_t BASIC_MESSAGE = 2;
bool BdbMessageStore::useAsync;
+qpid::sys::Duration BdbMessageStore::defJournalTimeout(1000); // 1ms (timeout value is in
nanoseconds)
unsigned int TxnCtxt::count = 0;
@@ -148,7 +149,7 @@
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
if (usingJrnl()) {
- JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"));
+ JournalImpl* jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
string("JournalData"), defJournalTimeout);
queue.setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
@@ -327,7 +328,7 @@
if (usingJrnl())
{
const char* queueName = queue->getName().c_str();
- JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"));
+ JournalImpl* jQueue = new JournalImpl(queueName, getJrnlDir(queueName),
string("JournalData"), defJournalTimeout);
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
try
@@ -756,13 +757,14 @@
void BdbMessageStore::flush(const qpid::broker::PersistableQueue& queue)
{
+ if (!usingJrnl()) return;
checkInit();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc){
jc->flush();
- ::usleep(10000); /////////////// hack ----------- FIX!!
- jc->get_wr_events();
+// ::usleep(10000); /////////////// hack ----------- FIX!!
+// jc->get_wr_events();
}
}catch ( journal::jexception& e) {
std::string str;
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-11-02 13:30:43 UTC (rev 1209)
@@ -34,6 +34,7 @@
#include "TxnCtxt.h"
#include <qpid/broker/MessageStore.h>
#include <qpid/sys/Monitor.h>
+#include <qpid/sys/Time.h>
#include <map>
#include <set>
#include <iostream>
@@ -75,6 +76,7 @@
std::string storeDir;
bool isInit;
const char* envPath;
+ static qpid::sys::Duration defJournalTimeout;
void recoverQueues(TxnCtxt& txn, qpid::broker::RecoveryManager&
recovery, queue_index& index,
txn_list& locked, message_index& messages);
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -23,17 +23,29 @@
#include "JournalImpl.h"
#include "jrnl/jerrno.hpp"
+#include <qpid/sys/Monitor.h>
using namespace rhm::bdbstore;
+using namespace rhm::journal;
+qpid::broker::TimerA JournalImpl::journalGetEventsTimer;
+
+
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
- const std::string& journalBaseFilename):
- jcntl(journalId, journalDirectory, journalBaseFilename)
-{}
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration timeout):
+ jcntl(journalId, journalDirectory, journalBaseFilename),
+ timerSetFlag(false)
+{
+ fireEventPtr = new FireEvent(this, timeout);
+ journalGetEventsTimer.start();
+}
JournalImpl::~JournalImpl()
-{}
+{
+ //fireEventPtr->cancel();
+}
void
JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
@@ -70,3 +82,36 @@
}
}
}
+
+void
+JournalImpl::flush() throw (journal::jexception)
+{
+ jcntl::flush();
+ handleEventTimer((const iores)0);
+}
+
+void
+JournalImpl::fire()
+{
+ if (_wmgr.get_aio_evt_rem()) {
+ jcntl::get_wr_events();
+ }
+ timerSetFlag = false;
+ if (_wmgr.get_aio_evt_rem()) {
+ intrusive_ptr_add_ref(fireEventPtr.get());
+ journalGetEventsTimer.add(fireEventPtr);
+ timerSetFlag = true;
+ }
+}
+
+const iores
+JournalImpl::handleEventTimer(const iores res)
+{
+ if (_wmgr.get_aio_evt_rem() && !timerSetFlag) {
+ intrusive_ptr_add_ref(fireEventPtr.get());
+ journalGetEventsTimer.add(fireEventPtr);
+ timerSetFlag = true;
+ }
+ return res;
+}
+
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-02 13:30:43 UTC (rev 1209)
@@ -24,21 +24,35 @@
#ifndef _JournalImpl_
#define _JournalImpl_
+#include <set>
#include "jrnl/jcntl.hpp"
#include "jrnl/data_tok.hpp"
#include "PreparedTransaction.h"
+#include <qpid/broker/Timer.h>
+#include <qpid/sys/Time.h>
#include <boost/ptr_container/ptr_list.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace rhm {
namespace bdbstore {
- class JournalImpl : public journal::jcntl
- {
+ class FireEvent;
+
+ class JournalImpl : public journal::jcntl
+ {
+ private:
+ static qpid::broker::TimerA journalGetEventsTimer;
+ bool timerSetFlag;
+ qpid::broker::TimerTaskA::intrusive_ptr fireEventPtr;
+
+
public:
JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
- const std::string& journalBaseFilename);
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration timeout);
~JournalImpl();
+
void recover(std::deque<journal::data_tok*>* rd_dtokl, const
journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
boost::ptr_list<bdbstore::PreparedTransaction>&
prep_tx_list,
@@ -50,8 +64,30 @@
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, queue_id);
}
+
+ void flush() throw (journal::jexception);
+
+ // TimerTask callback
+ void fire();
+
+ private:
+ const journal::iores handleEventTimer(const journal::iores res);
};
+
+ class FireEvent : public virtual qpid::broker::TimerTaskA
+ {
+ JournalImpl* parent;
+ public:
+ FireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::broker::TimerTaskA(timeout), parent(p) {}
+ virtual ~FireEvent() {}
+ inline void fire() { if (parent) parent->fire(); unref(); }
+ inline void cancel() { cancelled=true; parent=0; }
+ };
+
+
+
} // namespace bdbstore
} // namespace rhm
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -64,7 +64,9 @@
_wrfc(),
_rmgr(this, _emap, _tmap, _rrfc),
_wmgr(this, _emap, _tmap, _wrfc)
-{}
+{
+ pthread_mutex_init(&_mutex, NULL);
+}
jcntl::~jcntl()
{
@@ -78,6 +80,7 @@
::delete _datafh[i];
::delete[] _datafh;
}
+ pthread_mutex_destroy(&_mutex);
}
void
@@ -263,7 +266,20 @@
const u_int32_t
jcntl::get_wr_events() throw (jexception)
{
- return _wmgr.get_events(pmgr::UNUSED);
+ int ret = pthread_mutex_trylock(&_mutex);
+ if (ret)
+ {
+ if (ret != EBUSY)
+ {
+ std::stringstream ss;
+ ss << "pthread_mutex_trylock() returned " << errno
<< " (" << strerror(errno) << ")";
+ throw jexception(jerrno::JERR__PTHREAD, ss.str(), "jcntl",
"get_wr_events");
+ }
+ return 0; // already locked, return immediately
+ }
+ u_int32_t res = _wmgr.get_events(pmgr::UNUSED);
+ pthread_mutex_unlock(&_mutex);
+ return res;
}
const u_int32_t
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -48,6 +48,7 @@
#include <jrnl/rmgr.hpp>
#include <jrnl/wmgr.hpp>
#include <jrnl/wrfc.hpp>
+#include <pthread.h>
#include <qpid/broker/PersistableQueue.h>
namespace rhm
@@ -138,6 +139,7 @@
rmgr _rmgr; ///< Read page manager which manages AIO
wmgr _wmgr; ///< Write page manager which manages AIO
rcvdat _rcvdat; ///< Recovery data used for recovery
+ pthread_mutex_t _mutex; ///< Mutex for tread safety
std::deque<rhm::journal::data_tok*> _aio_rd_cmpl_dtok_list; ///<
Internally mamanged deque
std::deque<rhm::journal::data_tok*> _aio_wr_cmpl_dtok_list; ///<
Internally mamanged deque
@@ -218,7 +220,7 @@
* \param wr_dtokl deque for storing data tokens retruning from enqueue and
dequeue (write)
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be
NULL.
- * \param prep_tx_list
+ * \param prep_txn_list
*
* \exception TODO
*/
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -50,6 +50,7 @@
const u_int32_t jerrno::JERR__AIO = 0x0103;
const u_int32_t jerrno::JERR__FILEIO = 0x0104;
const u_int32_t jerrno::JERR__RTCLOCK = 0x0105;
+const u_int32_t jerrno::JERR__PTHREAD = 0x0106;
// class jcntl
const u_int32_t jerrno::JERR_JCNTL_STOPPED = 0x0200;
@@ -127,6 +128,7 @@
_err_map[JERR__AIO] = std::string("JERR__AIO: AIO error.");
_err_map[JERR__FILEIO] = std::string("JERR__FILEIO: File read or write
failure.");
_err_map[JERR__RTCLOCK] = std::string("JERR__RTCLOCK: Reading real-time clock
failed.");
+ _err_map[JERR__PTHREAD] = std::string("JERR__PTHREAD: pthread failure.");
// class jcntl
_err_map[JERR_JCNTL_STOPPED] = std::string("JERR_JCNTL_STOPPED: Operation on
stopped journal.");
Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -67,6 +67,7 @@
static const u_int32_t JERR__AIO; ///< AIO failure
static const u_int32_t JERR__FILEIO; ///< File read or write
failure
static const u_int32_t JERR__RTCLOCK; ///< Reading real-time clock
failed
+ static const u_int32_t JERR__PTHREAD; ///< pthread failure
// class jcntl
static const u_int32_t JERR_JCNTL_STOPPED; ///< Operation on stopped
journal
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-11-02 13:30:43 UTC (rev 1209)
@@ -107,7 +107,7 @@
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -122,7 +122,7 @@
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp !=
NULL);
delete jcp;
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -139,7 +139,7 @@
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.initialize();
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -155,7 +155,7 @@
jcp->initialize();
delete jcp;
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -186,7 +186,7 @@
jc.recover_complete();
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -219,7 +219,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -240,7 +240,7 @@
for (int m=0; m<NUM_MSGS; m++)
enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -258,7 +258,7 @@
enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
delete jcp;
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -281,7 +281,7 @@
enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
txn_commit(&jc, xid);
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -300,7 +300,7 @@
txn_commit(jcp, xid);
delete jcp;
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -335,7 +335,7 @@
}
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -370,7 +370,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -408,7 +408,7 @@
}
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -444,7 +444,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -487,7 +487,7 @@
}
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -529,7 +529,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -575,7 +575,7 @@
}
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -618,7 +618,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -663,7 +663,7 @@
deq_msg(&jc, m);
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -707,7 +707,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -755,7 +755,7 @@
deq_msg(&jc, m);
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -800,7 +800,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -871,7 +871,7 @@
deq_msg(&jc, m);
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -941,7 +941,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
@@ -1019,7 +1019,7 @@
deq_msg(&jc, m);
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
std::stringstream ss;
ss << e;
@@ -1095,7 +1095,7 @@
delete jcp;
}
}
- catch (rhm::journal::jexception& e)
+ catch (const rhm::journal::jexception& e)
{
if (jcp)
delete jcp;
Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest 2007-11-01 21:00:00 UTC (rev 1208)
+++ store/trunk/cpp/tests/jrnl/rtest 2007-11-02 13:30:43 UTC (rev 1209)
@@ -58,8 +58,8 @@
RM_DIR="${RM} -rf"
TEST_PROG="./jtest"
CHK_PROG="./janalyze.py"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high
--show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
MAKE="make -f Makefile.rtest"