Author: cctrieloff
Date: 2007-10-04 15:54:30 -0400 (Thu, 04 Oct 2007)
New Revision: 964
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/TxnCtxt.h
Log:
code clean up for async tx / dtx
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-04 19:52:33 UTC (rev 963)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-04 19:54:30 UTC (rev 964)
@@ -391,7 +391,7 @@
journal::data_tok dtokp;
size_t readSize = 0;
// char** buff = 0;
- unsigned aio_sleep_cnt = 0;
+// unsigned aio_sleep_cnt = 0;
unsigned msg_count=0;
bool read = true;
@@ -446,10 +446,10 @@
break;
}
case rhm::journal::RHM_IORES_AIO_WAIT:
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+/* if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
{
THROW_STORE_EXCEPTION("Store error, disk time out on recover
for:" + queue->getName());
- }
+ }*/
::usleep(AIO_SLEEP_TIME);
break;
case rhm::journal::RHM_IORES_EMPTY:
@@ -792,7 +792,7 @@
dtokp->setSourceMessage (&message);
dtokp->set_rid(message.getPersistenceId()); // set the messageID into the Journal
header (record-id)
- unsigned aio_sleep_cnt = 0;
+// unsigned aio_sleep_cnt = 0;
bool written = false;
while (!written)
{
@@ -805,13 +805,12 @@
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+/* if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
delete dtokp;
THROW_STORE_EXCEPTION("Error storing message -- AIO timeout for:
" + queue->getName());
- }
- usleep(AIO_SLEEP_TIME);
+ }*/
+ usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get
events
jc->get_wr_events();
- aio_sleep_cnt++;
break;
case rhm::journal::RHM_IORES_FULL:
delete dtokp;
@@ -892,7 +891,7 @@
void BdbMessageStore::async_dequeue(TransactionContext* ctxt, PersistableMessage&
msg, const PersistableQueue& queue)
{
- unsigned aio_sleep_cnt = 0;
+// unsigned aio_sleep_cnt = 0;
bool written = false;
journal::data_tok* ddtokp = new journal::data_tok;
ddtokp->setSourceMessage (&msg);
@@ -923,13 +922,12 @@
written = true;
break;
case rhm::journal::RHM_IORES_AIO_WAIT:
- if (aio_sleep_cnt >= MAX_AIO_SLEEPS){
+/* if (++aio_sleep_cnt >= MAX_AIO_SLEEPS){
delete ddtokp;
THROW_STORE_EXCEPTION("Error dequeuing message -- AIO timeout for:
" + queue.getName());
- }
+ } */
+ usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as
option
jc->get_wr_events();
- usleep(AIO_SLEEP_TIME);
- aio_sleep_cnt++;
break;
default:
delete ddtokp;
@@ -1017,7 +1015,7 @@
auto_ptr<TransactionContext> BdbMessageStore::begin()
{
- TxnCtxt* txn(new TxnCtxt());
+ TxnCtxt* txn(new TxnCtxt(true));
txn->begin(env);
return auto_ptr<TransactionContext>(txn);
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-10-04 19:52:33 UTC (rev 963)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-10-04 19:54:30 UTC (rev 964)
@@ -47,6 +47,7 @@
ipqdef impactedQueues; // list of Queues used in the txn
static unsigned int count;
mutable qpid::sys::Mutex Lock;
+ bool loggedtx;
unsigned int getCount() {
qpid::sys::Mutex::ScopedLock locker(Lock);
@@ -73,8 +74,8 @@
public:
- TxnCtxt() : txn(0) {
- tid = "rhm-tid" + getCount();
+ TxnCtxt(bool _loggedtx=false) : loggedtx(_loggedtx), txn(0) {
+ if (loggedtx) tid = "rhm-tid" + getCount();
}
/**
@@ -83,15 +84,12 @@
*@return if the data sucessfully synced.
*/
void sync(){
- bool allWritten = true;
+ bool allWritten = false;
bool firstloop = true;
- unsigned aio_sleep_cnt = 0;
- while (!allWritten){
- if (!firstloop) ::usleep(AIO_SLEEP_TIME);
+ while (loggedtx && !allWritten){
+ if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events
call aiolib..
allWritten = true;
- unsigned qcnt = 0;
for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end();
i++) {
- qcnt ++;
journal::jcntl* jc =
static_cast<journal::jcntl*>((*i)->getExternalQueueStore());
if (jc && !(jc->is_txn_synced(getXid())))
{
@@ -102,10 +100,6 @@
}
}
firstloop = false;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS*qcnt)
- {
- THROW_STORE_EXCEPTION("Store error, disk time out on sync for:"
+ getXid());
- }
}
}