Author: kpvdr
Date: 2007-11-07 22:59:29 -0500 (Wed, 07 Nov 2007)
New Revision: 1265
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
Log:
Added temp JournalImpl::loadMsgContent() fn for holding last read msg from journal so that
it can be read in parts.
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-11-08 03:59:29 UTC (rev 1265)
@@ -22,6 +22,7 @@
*/
#include "JournalImpl.h"
+
#include "jrnl/jerrno.hpp"
#include <qpid/sys/Monitor.h>
@@ -41,7 +42,11 @@
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
writeActivityFlag(false),
- flushTriggeredFlag(true)
+ flushTriggeredFlag(true),
+ _datap(0),
+ _dlen(0),
+ _dtok(),
+ _external(false)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -91,6 +96,64 @@
}
}
+#define MAX_AIO_SLEEPS 500
+#define AIO_SLEEP_TIME 1000000
+const bool
+JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t offset, size_t
length)
+ throw (journal::jexception)
+{
+ if (_dtok.rid() != rid)
+ {
+ _datap = 0;
+ _dlen = 0;
+ _dtok.reset();
+ _dtok.set_rid(rid);
+ _dtok.set_wstate(journal::data_tok::ENQ);
+ _external = false;
+ void* xidp = 0;
+ size_t xlen = 0;
+ bool transient = false;
+ bool done = false;
+ unsigned aio_sleep_cnt = 0;
+ while (!done)
+ {
+ iores res = read_data_record(&_datap, _dlen, &xidp, xlen, transient,
_external, &_dtok);
+ if (res == journal::RHM_IORES_SUCCESS) {
+ done = true;
+ } else if (res == journal::RHM_IORES_AIO_WAIT) {
+ if (++aio_sleep_cnt <= MAX_AIO_SLEEPS) {
+ get_wr_events();
+ usleep(AIO_SLEEP_TIME);
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " <<
journal::pmgr::iores_str(res);
+ ss << "; exceeded maximum wait time";
+ throw jexception(ss.str());
+ }
+ } else {
+ std::stringstream ss;
+ ss << "read_data_record() returned " <<
journal::pmgr::iores_str(res);
+ throw jexception(ss.str());
+ }
+ }
+ // set correct pointer for deletion by boost::shared_ptr
+ if (xlen) {
+ _master = boost::shared_ptr<char>((char*)xidp);
+ } else {
+ _master = boost::shared_ptr<char>((char*)_datap);
+ }
+ }
+ if (_external)
+ return true;
+ if (offset + length > _dlen) {
+ std::stringstream ss;
+ ss << "loadMsgContent(): offset + length exceeds available message
size";
+ throw jexception(ss.str());
+ }
+ data.append((const char*)_datap + offset, length);
+ return false;
+}
+
const iores
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-11-07 20:11:49 UTC (rev 1264)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-11-08 03:59:29 UTC (rev 1265)
@@ -73,6 +73,13 @@
bool writeActivityFlag;
bool flushTriggeredFlag;
qpid::broker::TimerTaskA::intrusive_ptr inactivityFireEventPtr;
+
+ // temp local vars for loadMsgContent below
+ boost::shared_ptr<char> _master;
+ void* _datap;
+ size_t _dlen;
+ journal::data_tok _dtok;
+ bool _external;
public:
JournalImpl(const std::string& journalId,
@@ -93,6 +100,12 @@
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
&aio_wr_callback, prep_tx_list, queue_id);
}
+
+ // Temporary fn to read and save last msg read from journal so it can be
assigned
+ // in chunks. To be replaced when coding to do this direct from the journal
is ready.
+ // Returns true if the record is extern, false if local.
+ const bool loadMsgContent(u_int64_t rid, std::string& data, size_t
offset,
+ size_t length) throw (journal::jexception);
// Overrides for write inactivity timer
const journal::iores enqueue_data_record(const void* const data_buff,