Author: kpvdr
Date: 2007-09-24 16:16:06 -0400 (Mon, 24 Sep 2007)
New Revision: 943
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/pmgr.hpp
store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
store/trunk/cpp/tests/jrnl/Makefile.rtest
store/trunk/cpp/tests/jrnl/msg_consumer.cpp
store/trunk/cpp/tests/jrnl/msg_producer.cpp
Log:
Added new jcntl provisional DTX interface
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -164,7 +164,7 @@
if (eqs)
{
journal::jcntl* jQueue = static_cast<journal::jcntl*>(eqs);
- jQueue->delete_jrnl();
+ jQueue->delete_jrnl_files();
queue.setExternalQueueStore(NULL); // will delete the journal if exists
}
@@ -314,7 +314,7 @@
{
jQueue->recover(); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
- jQueue->recovered(); // start journal.
+ jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
std::string s;
THROW_STORE_EXCEPTION(e.to_string(s) + queueName);
@@ -415,7 +415,7 @@
std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
- rhm::journal::iores res = jc->read_data(&buff, buffSize, &dtokp);
+ rhm::journal::iores res = jc->read_next_data_record(&buff, buffSize,
&dtokp);
readSize = dtokp.dsize();
assert(readSize < buffSize); /// fail safe for hack...
@@ -798,7 +798,7 @@
while (!written)
{
journal::jcntl* jc =
static_cast<journal::jcntl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data(buff, size, dtokp);
+ rhm::journal::iores eres = jc->enqueue_data_record(buff, size,
dtokp);
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -904,7 +904,7 @@
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_data(ddtokp);
+ dres = jc->dequeue_data_record(ddtokp);
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -173,7 +173,7 @@
}
void
-jcntl::recovered() throw (jexception)
+jcntl::recover_complete() throw (jexception)
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl",
"recovered");
@@ -185,7 +185,7 @@
}
void
-jcntl::delete_jrnl() throw (jexception)
+jcntl::delete_jrnl_files() throw (jexception)
{
stop(true); // wait for AIO to complete
_jdir.delete_dir();
@@ -193,7 +193,7 @@
const iores
-jcntl::enqueue_data(const void* const dbuf, const size_t dlen, data_tok* const dtok)
+jcntl::enqueue_data_record(const void* const dbuf, const size_t dlen, data_tok* const
dtok)
throw (jexception)
{
check_wstatus("enqueue_data");
@@ -201,14 +201,35 @@
}
const iores
-jcntl::read_data(void* const dbuf, const size_t dbsize, data_tok* const dtok) throw
(jexception)
+jcntl::enqueue_data_record(const void* const /*data_buff*/, const size_t
/*tot_data_len*/,
+ const size_t /*this_data_len*/, const void* const /*xid_buff*/, const size_t
/*xid_len*/,
+ data_tok* /*dtok*/) throw (jexception)
{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+get_next_data_record(data_tok* /*dtok*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+discard_next_data_record(data_tok* /*dtok*/) throw (jexception)
+{
+ return RHM_IORES_NOTIMPL;
+}
+
+const iores
+jcntl::read_next_data_record(void* const dbuf, const size_t dbsize, data_tok* const
dtok)
+ throw (jexception)
+{
check_rstatus("read_data");
return _rmgr.read(dbuf, dbsize, dtok);
}
const iores
-jcntl::dequeue_data(data_tok* const dtok) throw (jexception)
+jcntl::dequeue_data_record(data_tok* const dtok) throw (jexception)
{
check_wstatus("dequeue_data");
return _wmgr.dequeue(dtok);
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -56,7 +56,6 @@
{
/**
- * \class jcntl
* \brief Access and control interface for the journal. This is the top-level class
for the
* journal.
*
@@ -70,7 +69,6 @@
{
private:
/**
- * \var _jid
* \brief Journal ID
*
* This string uniquly identifies this journal instance. It will most likely be
associated
@@ -80,7 +78,6 @@
std::string _jid;
/**
- * \var _jdir
* \brief Journal directory
*
* This string stores the path to the journal directory. It may be absolute or
relative, and
@@ -90,7 +87,6 @@
jdir _jdir;
/**
- * \var _base_filename
* \brief Base filename
*
* This string contains the base filename used for the journal files. The
filenames will
@@ -101,7 +97,6 @@
std::string _base_filename;
/**
- * \var _init_flag
* \brief Initialized flag
*
* This flag starts out set to false, is set to true once this object has been
initilaized,
@@ -110,7 +105,6 @@
bool _init_flag;
/**
- * \var _stop_flag
* \brief Stopped flag
*
* This flag starts out false, and is set to true when stop() is called. At this
point, the
@@ -119,8 +113,20 @@
*/
bool _stop_flag;
+ /**
+ * \brief Read-only state flag used during recover.
+ *
+ * When true, this flag prevents journal write operations (enqueue and dequeue),
but
+ * allows read to occur. It is used durning recovery, and is reset when
recovered() is
+ * called.
+ */
bool _readonly_flag;
+ /**
+ * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low
water
+ * marker. If not set, then attempts to write will throw exceptions until the
journal
+ * file low water marker moves to the next journal file.
+ */
bool _autostop; ///< Autostop flag - stops journal when overrun
occurs
// Journal control structures
@@ -161,23 +167,29 @@
* <b>NOTE: Any existing journal will be ignored by this
operation.</b> To use recover
* the data from an existing journal, use recover().
*
- * <b>NOTE: if NULL is passed to the deque's they are created internally
and deleted
- * intenally </b>
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they
will be internally created
+ * and deleted.</b>
*
- * <b>NOTE: if NULL is passed to the callbacks internal ones will be used.
</b>
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal
default callbacks will be
+ * used.</b>
*
* \param rd_dtokl deque for storing data tokens retruning from read AIO
operations.
* \param rd_cb Function pointer to callback function for read operations. May be
NULL.
* \param wr_dtokl deque for storing data tokens retruning from enqueue and
dequeue (write)
* AIO operations.
* \param wr_cb Function pointer to callback function for write operations. May be
NULL.
+ *
+ * \exception ??
*/
void initialize(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
/**
- * Initialize using internal default callbacks and data_tok lists.
+ * \brief Initialize using internal default callbacks and data_tok lists.
+ *
* TODO: Move to JournalImpl later
+ *
+ * \exception ??
*/
void initialize() throw (jexception)
{
@@ -185,29 +197,67 @@
&aio_wr_callback );
}
+ /**
+ * /brief Initialize journal by recovering state from previously written journal.
+ *
+ * Initialize journal by recovering state from previously written journal. The
journal files
+ * are analyzed, and all records that have not been dequeued and that remain in
the jouranl
+ * will be available for reading. The journal is placed in a read-only state
until
+ * recovered() is called; any calls to enqueue or dequeue will fail with an
exception
+ * in this state.
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they
will be internally created
+ * and deleted.</b>
+ *
+ * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal
default callbacks will be
+ * used.</b>
+ *
+ * \param rd_dtokl deque for storing data tokens retruning from read AIO
operations.
+ * \param rd_cb Function pointer to callback function for read operations. May be
NULL.
+ * \param wr_dtokl deque for storing data tokens retruning from enqueue and
dequeue (write)
+ * AIO operations.
+ * \param wr_cb Function pointer to callback function for write operations. May be
NULL.
+ *
+ * \exception ??
+ */
void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb) throw (jexception);
+ /**
+ * \brief Recover using internal default callbacks and data_tok lists.
+ *
+ * TODO: Move to JournalImpl later
+ *
+ * \exception ??
+ */
void recover() throw (jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback,
&_aio_wr_cmpl_dtok_list,
&aio_wr_callback );
}
-
- void recovered() throw (jexception);
+ /**
+ * \brief Notification to the journal that recovery is complete and that normal
operation
+ * may resume.
+ *
+ * This call notifies the journal that recovery is complete and that normal
operation
+ * may resume. The read pointers are reset so that all records read as a part of
recover
+ * may be re-read during normal operation. The read-only flag is then reset,
allowing
+ * enqueue and dequeue operations to resume.
+ *
+ * \exception ??
+ */
+ void recover_complete() throw (jexception);
/**
- * \brief Delete the journal directory of files matching the base filename
- * by moving them into a subdirectory
- *
- * Stops AIO, stop journal, deletes files.....
+ * \brief Stops journal and deletes all journal files.
*
- * \exception The directory handle could not be closed.
+ * Clear the journal directory of all journal files matching the base filename.
+ *
+ * \exception ??
*/
- void delete_jrnl() throw (jexception);
+ void delete_jrnl_files() throw (jexception);
-
/**
* \brief Enqueues (writes) data to the journal.
*
@@ -215,11 +265,55 @@
* \param dlen Size of data (in bytes) in dbuf to be written.
* \param dtok Pointer to data_tok instance for this data, used to track state of
data
* through journal.
+ *
+ * \exception ??
*/
- const iores enqueue_data(const void* const dbuf, const size_t dlen, data_tok*
const dtok)
+ const iores enqueue_data_record(const void* const dbuf, const size_t dlen,
+ data_tok* const dtok) throw (jexception);
+
+ /**
+ * \brief Enqueue data or part of data
+ *
+ * \param data_buff
+ * \param tot_data_len
+ * \param this_data_len
+ * \param xid_buff
+ * \param xid_len
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores enqueue_data_record(
+ const void* const data_buff, ///< pointer to data to be enqueued in
this enq op
+ const size_t tot_data_len, ///< total data length
+ const size_t this_data_len, ///< amount to be written in this enq
op
+ const void* const xid_buff, ///< pointer to xid
+ const size_t xid_len, ///< xid length
+ data_tok* dtok) ///< pointer to data token instance
throw (jexception);
/**
+ * \brief Retrieve details of next record to be read without consuming the
record.
+ *
+ * Retrieve details of next record to be read without consuming the record. A
pointer to
+ * the data is also returned.
+ *
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores get_next_data_record(data_tok* dtok) throw (jexception);
+
+ /**
+ * \brief Discard (skip) next record to be read without reading or retrieving it.
+ *
+ * \param dtok
+ *
+ * \exception ??
+ */
+ const iores discard_next_data_record(data_tok* dtok) throw (jexception);
+
+ /**
* \brief Reads data from the journal.
*
* \param dbuf Pointer to buffer into which data is to be read.
@@ -227,26 +321,67 @@
* dbuf.
* \param dtok Pointer to data_tok instance for this data, used to track state of
data
* through journal.
+ *
+ * \exception ??
*/
- const iores read_data(void* const dbuf, const size_t dbsize, data_tok* const
dtok)
- throw (jexception);
-
-// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
-// throw (jexception);
+ const iores read_next_data_record(void* const dbuf, const size_t dbsize,
+ data_tok* const dtok) throw (jexception);
/**
- * \brief Dequeues (marks as no longer needed) data from journal.
+ * \brief Dequeues (marks as no longer needed) data record in journal.
*
- * Dequeues (marks as no longer needed) data from journal. Note that it is
imperative to use
- * the same data token instance used to enqueue this data; it contains the record
ID needed
- * to correctly mark this data as dequeued in the journal.
+ * Dequeues (marks as no longer needed) data record in journal. Note that it is
possible
+ * to use the same data token instance used to enqueue this data; it contains the
record ID
+ * needed to correctly mark this data as dequeued in the journal. Otherwise the
RID of the
+ * record to be dequeued and the write state of ENQ must be manually set in a new
or reset
+ * instance of data_tok.
*
- * \param dtok Pointer to data_tok instance for this data, used to track state
of data
+ * \param dtok Pointer to data_tok instance for this data, used to track state of
data
* through journal.
+ *
+ * \exception ??
*/
- const iores dequeue_data(data_tok* const dtok) throw (jexception);
+ const iores dequeue_data_record(data_tok* const dtok) throw (jexception);
/**
+ * \brief Abort the transaction for all records enqueued or dequeued with the
matching xid.
+ *
+ * Abort the transaction for all records enqueued with the matching xid. All
enqueued records
+ * are effectively deleted from the journal, and can not be read. All dequeued
records remain
+ * as though they had never been dequeued.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const iores abort_xid(const void* const xid_buff, const size_t xid_len) throw
(jexception);
+
+ /**
+ * \brief Commit the transaction for all records enqueued or dequeued with the
matching xid.
+ *
+ * Commit the transaction for all records enqueued with the matching xid. All
enqueued
+ * records are effectively released for reading and dequeueing. All dequeued
records are
+ * removed and can no longer be accessed.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const iores commit_xid(const void* const xid_buff, const size_t xid_len) throw
(jexception);
+
+ /**
+ * \brief Check whether all the enqueue records for the given xid have reached
disk.
+ *
+ * \param xid_buff
+ * \param xid_len
+ *
+ * \exception ??
+ */
+ const bool is_synced(const void* const xid_buff, const size_t xid_len) throw
(jexception);
+
+ /**
* \brief Forces a check for returned AIO write events.
*
* Forces a check for returned AIO write events. This is normally performed by
enqueue() and
Modified: store/trunk/cpp/lib/jrnl/pmgr.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/lib/jrnl/pmgr.hpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -64,7 +64,9 @@
RHM_IORES_AIO_WAIT, ///< IO operation suspended as all pages in cache are
waiting for AIO.
RHM_IORES_EMPTY, ///< During read operations, nothing further is available
to read.
RHM_IORES_FULL, ///< During write operations, the journal files are full.
- RHM_IORES_BUSY ///< Another blocking operation is in progress.
+ RHM_IORES_BUSY, ///< Another blocking operation is in progress.
+ RHM_IORES_TXPENDING, ///< Operation blocked by pending transaction.
+ RHM_IORES_NOTIMPL ///< Not yet implemented.
};
typedef _iores iores;
Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -1,5 +1,5 @@
/**
-* \file JournalTest.cpp
+* \file JournalSystemTests.cpp
*
* Red Hat Messaging - Message Journal
*
@@ -181,7 +181,7 @@
{
rhm::journal::jcntl jc(test_name, "jdata", test_name);
jc.recover();
- jc.recovered();
+ jc.recover_complete();
}
}
catch (rhm::journal::jexception& e)
@@ -216,7 +216,7 @@
rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name,
"jdata", test_name);
CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.",
jcp != NULL);
jcp->recover();
- jcp->recovered();
+ jcp->recover_complete();
delete jcp;
}
}
@@ -350,7 +350,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -386,7 +386,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -420,7 +420,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
@@ -458,7 +458,7 @@
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
for (int m=0; m<NUM_MSGS; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
@@ -502,7 +502,7 @@
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(&jc)) == 0);
- jc.recovered();
+ jc.recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
enq_msg(&jc, create_msg(msg, m));
@@ -558,7 +558,7 @@
for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
create_msg(msg, m).compare(read_msg(jcp)) == 0);
- jcp->recovered();
+ jcp->recover_complete();
// rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
enq_msg(jcp, create_msg(msg, m));
@@ -745,7 +745,7 @@
CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp !=
NULL);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->enqueue_data(msg.c_str(), msg.size(), dtp),
jc,
+ while (handle_jcntl_response(jc->enqueue_data_record(msg.c_str(), msg.size(),
dtp), jc,
aio_sleep_cnt, dtp));
}
@@ -757,7 +757,7 @@
dtp->set_rid(rid);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->dequeue_data(dtp), jc, aio_sleep_cnt, dtp));
+ while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt,
dtp));
}
char* read_msg(rhm::journal::jcntl* jc)
@@ -770,8 +770,8 @@
dtp->set_wstate(rhm::journal::data_tok::ENQ);
unsigned aio_sleep_cnt = 0;
- while (handle_jcntl_response(jc->read_data(buff, MAX_MSG_SIZE, dtp), jc,
aio_sleep_cnt,
- dtp));
+ while (handle_jcntl_response(jc->read_next_data_record(buff, MAX_MSG_SIZE,
dtp), jc,
+ aio_sleep_cnt, dtp));
buff[dtp->dsize()] = '\0';
return buff;
}
Modified: store/trunk/cpp/tests/jrnl/Makefile.rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/Makefile.rtest 2007-09-24 20:16:06 UTC (rev 943)
@@ -40,6 +40,7 @@
SHELL = /bin/bash
+QPID_HOME_DIR = $(HOME)/qpid
RHM_JRNL_SRC_DIR = ../../lib/jrnl
RHM_JRNL_DOC_DIR = ../../docs
@@ -73,7 +74,7 @@
CXX = g++
CXXINCLUDES = -I. -I../../lib -I../../../../qpid/cpp/src -I../../../../qpid/cpp/src/gen
CXXFLAGS = $(RHM_DEFINES) -Wall -Wextra -Werror -Wno-shadow -Wpointer-arith -Wcast-qual
-Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch -Wno-system-headers
-pedantic -ggdb -O0 -pthread $(CXXINCLUDES)
-LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L/home/kpvdr/redhat/qpid/cpp/src/.libs
+LDFLAGS = -lpthread -laio -lrt -lqpidcommon -L$(QPID_HOME_DIR)/cpp/src/.libs
.SUFFIXES:
.SUFFIXES: .cpp .o
Modified: store/trunk/cpp/tests/jrnl/msg_consumer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/msg_consumer.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -123,7 +123,8 @@
{
if (dtokp->wstate() >= rhm::journal::data_tok::ENQ)
{
- rhm::journal::iores res = _jcntl.read_data((void* const)_msg_buff,
buffSize, dtokp);
+ rhm::journal::iores res = _jcntl.read_next_data_record((void*
const)_msg_buff,
+ buffSize, dtokp);
rhm::journal::data_tok::read_state rs = dtokp->rstate();
rhm::journal::data_tok::write_state ws = dtokp->wstate();
switch (res)
Modified: store/trunk/cpp/tests/jrnl/msg_producer.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-24 18:04:55 UTC (rev 942)
+++ store/trunk/cpp/tests/jrnl/msg_producer.cpp 2007-09-24 20:16:06 UTC (rev 943)
@@ -142,7 +142,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores eres = jc.enqueue_data(msg, size, dtokp);
+ rhm::journal::iores eres = jc.enqueue_data_record(msg, size, dtokp);
rhm::journal::data_tok::write_state ws = dtokp->wstate();
const char* wsstr = dtokp->wstate_str();
switch (eres)
@@ -293,7 +293,7 @@
bool written = false;
while (!written)
{
- rhm::journal::iores dres = _jcptr->dequeue_data(ddtokp);
+ rhm::journal::iores dres = _jcptr->dequeue_data_record(ddtokp);
const char* wsstr = ddtokp->wstate_str();
switch (dres)
{