[rhmessaging-commits] rhmessaging commits: r1495 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Dec 14 14:41:15 EST 2007
Author: kpvdr
Date: 2007-12-14 14:41:15 -0500 (Fri, 14 Dec 2007)
New Revision: 1495
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/StoreException.h
Log:
Fix for BZ423981, but needs testing. Added monitor to JournalImpl.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 19:41:15 UTC (rev 1495)
@@ -33,7 +33,6 @@
#include "StringDbt.h"
#include "JournalImpl.h"
#include "DataTokenImpl.h"
-#include <boost/intrusive_ptr.hpp>
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -996,60 +995,75 @@
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
- bool written = false;
- unsigned aio_sleep_cnt = 0;
- unsigned busy_sleep_cnt = 0;
- while (!written)
- {
- JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres;
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
- }else {
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
- }
- }else {
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
- } else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
- }
+
+ JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+ if (txn->getXid().empty()){
+ if (message->isContentReleased()){
+ jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ } else {
+ jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
- switch (eres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //std::cout << "." << std::flush;
- if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
- written = true;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- //std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
- //std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_ENQCAPTHRESH:
- std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
- break;
- default:
- assert("Store Error: Unexpected msg state");
+ }else {
+ if (message->isContentReleased()){
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+ } else {
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
}
+
+// bool written = false;
+// unsigned aio_sleep_cnt = 0;
+// unsigned busy_sleep_cnt = 0;
+// while (!written)
+// {
+// JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+// rhm::journal::iores eres;
+// if (txn->getXid().empty()){
+// if (message->isContentReleased()){
+// eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+// }else {
+// eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+// }
+// }else {
+// if (message->isContentReleased()){
+// eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+// } else {
+// eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+// }
+// }
+// switch (eres)
+// {
+// case rhm::journal::RHM_IORES_SUCCESS:
+// //std::cout << "." << std::flush;
+// if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+// written = true;
+// aio_sleep_cnt = 0;
+// busy_sleep_cnt = 0;
+// break;
+// case rhm::journal::RHM_IORES_AIO_WAIT:
+// //std::cout << "w" << std::flush;
+// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+// usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+// jc->get_wr_events();
+// break;
+// case rhm::journal::RHM_IORES_BUSY:
+// //std::cout << "b" << std::flush;
+// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// break;
+// case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+// std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
+// case rhm::journal::RHM_IORES_FULL:
+// std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
+// break;
+// default:
+// assert("Store Error: Unexpected msg state");
+// }
+// }
} else {
/// cct message db
@@ -1127,7 +1141,6 @@
intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
- bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
@@ -1135,56 +1148,66 @@
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
string tid;
if (ctxt){
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
-
- unsigned aio_sleep_cnt = 0;
- unsigned busy_sleep_cnt = 0;
- while (!written)
- {
- rhm::journal::iores dres;
- try {
- if (tid.empty()){
- dres = jc->dequeue_data_record(ddtokp.get());
- } else {
- dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+ try {
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (tid.empty()) {
+ jc->dequeue_data_record(ddtokp.get());
+ } else {
+ jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- switch (dres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //std::cout << "." << std::flush;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- written = true;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- //std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
- //std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
- break;
- default:
- assert("Store Error: Unexpected msg state");
- }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
+
+// bool written = false;
+// unsigned aio_sleep_cnt = 0;
+// unsigned busy_sleep_cnt = 0;
+// while (!written)
+// {
+// rhm::journal::iores dres;
+// try {
+// if (tid.empty()){
+// dres = jc->dequeue_data_record(ddtokp.get());
+// } else {
+// dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+// }
+// } catch (const journal::jexception& e) {
+// THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+// }
+// switch (dres)
+// {
+// case rhm::journal::RHM_IORES_SUCCESS:
+// //std::cout << "." << std::flush;
+// aio_sleep_cnt = 0;
+// busy_sleep_cnt = 0;
+// written = true;
+// break;
+// case rhm::journal::RHM_IORES_AIO_WAIT:
+// //std::cout << "w" << std::flush;
+// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// jc->get_wr_events();
+// break;
+// case rhm::journal::RHM_IORES_BUSY:
+// //std::cout << "b" << std::flush;
+// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// break;
+// case rhm::journal::RHM_IORES_FULL:
+// std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
+// break;
+// default:
+// assert("Store Error: Unexpected msg state");
+// }
+// }
}
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-14 19:41:15 UTC (rev 1495)
@@ -25,6 +25,7 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "StoreException.h"
#include <qpid/sys/Monitor.h>
using namespace rhm::bdbstore;
@@ -49,6 +50,7 @@
getEventsTimerSetFlag(false),
writeActivityFlag(false),
flushTriggeredFlag(true),
+ aioWait(false),
_xidp(0),
_datap(0),
_dlen(0),
@@ -168,69 +170,67 @@
return true;
}
-const iores
+void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
-//std::cout << " " << _jid << ":E" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_data_record(data_buff, tot_data_len,
- this_data_len, dtokp, transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
+ transient)));
}
-const iores
+void
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
const bool transient)
{
-//std::cout << " " << _jid << ":E-ext" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_extern_data_record(tot_data_len, dtokp,
- transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
}
-const iores
+void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
-//std::cout << " " << _jid << ":E-tx" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_txn_data_record(data_buff, tot_data_len,
- this_data_len, dtokp, xid, transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
+ dtokp, xid, transient)));
}
-const iores
+void
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
-//std::cout << " " << _jid << ":E-tx-ext" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
- transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
+ transient)));
}
-const iores
+void
JournalImpl::dequeue_data_record(data_tok* const dtokp)
{
-//std::cout << " " << _jid << ":D" << std::flush;
- return handleInactivityTimer(jcntl::dequeue_data_record(dtokp));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
}
-const iores
+void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":D-tx" << std::flush;
- return handleInactivityTimer(jcntl::dequeue_txn_data_record(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
}
-const iores
+void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":A-tx" << std::flush;
- return handleInactivityTimer(jcntl::txn_abort(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
}
-const iores
+void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":C-tx" << std::flush;
- return handleInactivityTimer(jcntl::txn_commit(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
}
void
@@ -289,11 +289,42 @@
journalTimer.add(inactivityFireEventPtr);
}
-const iores
-JournalImpl::handleInactivityTimer(const iores r)
+const bool
+JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
- return r;
+ switch (r)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+std::cout << "W " << std::flush;
+ aioWait = true;
+ writeLock.wait();
+ usleep(1000);
+ return true;
+ case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+ {
+ std::ostringstream oss;
+ oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ case rhm::journal::RHM_IORES_FULL:
+ {
+ std::ostringstream oss;
+ oss << "Journal full on queue \"" << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ default:
+ {
+ std::ostringstream oss;
+ oss << "Unexpected I/O response (" << rhm::journal::iores_str(r) << ") on queue " << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ }
}
void
@@ -329,6 +360,7 @@
dtokp->release();
this_dtok_list.pop_front();
}
+ jip->notifyWriteMonitor();
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-14 19:41:15 UTC (rev 1495)
@@ -74,6 +74,9 @@
bool writeActivityFlag;
bool flushTriggeredFlag;
qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
+
+ qpid::sys::Monitor writeLock;
+ bool aioWait;
// temp local vars for loadMsgContent below
void* _xidp;
@@ -118,29 +121,28 @@
size_t length);
// Overrides for write inactivity timer
- const journal::iores enqueue_data_record(const void* const data_buff,
- const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, journal::data_tok* dtokp,
const bool transient = false);
- const journal::iores enqueue_extern_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const bool transient = false);
+ void enqueue_extern_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
+ const bool transient = false);
- const journal::iores enqueue_txn_data_record(const void* const data_buff,
- const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+ void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, journal::data_tok* dtokp, const std::string& xid,
+ const bool transient = false);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- const journal::iores enqueue_extern_txn_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const std::string& xid, const bool transient = false);
+ void dequeue_data_record(journal::data_tok* const dtokp);
- const journal::iores dequeue_data_record(journal::data_tok* const dtokp);
+ void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores dequeue_txn_data_record(journal::data_tok* const dtokp,
- const std::string& xid);
+ void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores txn_abort(journal::data_tok* const dtokp, const std::string& xid);
+ void txn_commit(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores txn_commit(journal::data_tok* const dtokp, const std::string& xid);
-
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
@@ -149,9 +151,12 @@
// TimerTask callback
void getEventsFire();
void flushFire();
+
+ // Notify write monitor
+ inline void notifyWriteMonitor() { if (aioWait) { aioWait = false; writeLock.notify(); } }
private:
- const journal::iores handleInactivityTimer(const journal::iores r);
+ const bool handleIoResult(const journal::iores r);
static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
}; // class JournalImpl
Modified: store/trunk/cpp/lib/StoreException.h
===================================================================
--- store/trunk/cpp/lib/StoreException.h 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/StoreException.h 2007-12-14 19:41:15 UTC (rev 1495)
@@ -24,6 +24,9 @@
#ifndef _StoreException_
#define _StoreException_
+#include "IdDbt.h"
+#include <boost/format.hpp>
+
namespace rhm{
namespace bdbstore{
More information about the rhmessaging-commits
mailing list