Author: cctrieloff
Date: 2007-10-11 17:09:42 -0400 (Thu, 11 Oct 2007)
New Revision: 1011
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
use dtx methods
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 20:20:39 UTC (rev 1010)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:09:42 UTC (rev 1011)
@@ -390,8 +390,6 @@
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
size_t readSize = 0;
-// char** buff = 0;
-// unsigned aio_sleep_cnt = 0;
unsigned msg_count=0;
bool read = true;
@@ -405,12 +403,8 @@
try {
while (read) {
-//std:: cout << "loop -- uses fixed size -> FIX <-" <<
std::endl;
-
-// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize,
&xidbuff, xidbuffSize, transientFlag, &dtokp);
readSize = dtokp.dsize();
-// assert(readSize < buffSize); /// fail safe for hack...
switch (res)
{
@@ -430,9 +424,7 @@
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
- // TODO - change to prep list based on reading state from journal
- // -- add to prepared.enqued list..
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(),
dtokp.rid())) {
+ if (xidbuffSize > 0 && PreparedTransaction::isLocked(locked,
queue->getPersistenceId(), dtokp.rid()) ) {
prepared[dtokp.rid()] = msg;
} else {
queue->recover(msg);
@@ -798,7 +790,12 @@
while (!written)
{
JournalImpl* jc =
static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data_record(buff, size, size, dtokp,
/*txn->getXid(),*/ false);
+ rhm::journal::iores eres;
+ if (txn->getXid().empty()){
+ eres = jc->enqueue_data_record(buff, size, size, dtokp, false);
+ }else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp, txn->getXid(),
false);
+ }
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -910,7 +907,11 @@
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_txn_data_record(ddtokp, tid);
+ if (tid.empty()){
+ dres = jc->dequeue_data_record(ddtokp);
+ } else {
+ dres = jc->dequeue_txn_data_record(ddtokp, tid);
+ }
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;