rhmessaging commits: r1496 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-14 16:09:53 -0500 (Fri, 14 Dec 2007)
New Revision: 1496
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
Forgot to remove a block of commented out code from the last change
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 19:41:15 UTC (rev 1495)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 21:09:53 UTC (rev 1496)
@@ -1010,64 …
[View More]+1010,9 @@
jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
}
-
-// bool written = false;
-// unsigned aio_sleep_cnt = 0;
-// unsigned busy_sleep_cnt = 0;
-// while (!written)
-// {
-// JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
-// rhm::journal::iores eres;
-// if (txn->getXid().empty()){
-// if (message->isContentReleased()){
-// eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
-// }else {
-// eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
-// }
-// }else {
-// if (message->isContentReleased()){
-// eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
-// } else {
-// eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
-// }
-// }
-// switch (eres)
-// {
-// case rhm::journal::RHM_IORES_SUCCESS:
-// //std::cout << "." << std::flush;
-// if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
-// written = true;
-// aio_sleep_cnt = 0;
-// busy_sleep_cnt = 0;
-// break;
-// case rhm::journal::RHM_IORES_AIO_WAIT:
-// //std::cout << "w" << std::flush;
-// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-// usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
-// jc->get_wr_events();
-// break;
-// case rhm::journal::RHM_IORES_BUSY:
-// //std::cout << "b" << std::flush;
-// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-// break;
-// case rhm::journal::RHM_IORES_ENQCAPTHRESH:
-// std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
-// case rhm::journal::RHM_IORES_FULL:
-// std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
-// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
-// break;
-// default:
-// assert("Store Error: Unexpected msg state");
-// }
-// }
-
} else {
/// cct message db
- if (newId){ // only store in Bd if first time message is stored
+ if (newId) { // only store in Bd if first time message is stored
Dbt data(buff,size);
messageDb.put(txn->get(), &messageId, &data, DB_NOOVERWRITE);
}
@@ -1163,51 +1108,6 @@
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
-
-// bool written = false;
-// unsigned aio_sleep_cnt = 0;
-// unsigned busy_sleep_cnt = 0;
-// while (!written)
-// {
-// rhm::journal::iores dres;
-// try {
-// if (tid.empty()){
-// dres = jc->dequeue_data_record(ddtokp.get());
-// } else {
-// dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
-// }
-// } catch (const journal::jexception& e) {
-// THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
-// }
-// switch (dres)
-// {
-// case rhm::journal::RHM_IORES_SUCCESS:
-// //std::cout << "." << std::flush;
-// aio_sleep_cnt = 0;
-// busy_sleep_cnt = 0;
-// written = true;
-// break;
-// case rhm::journal::RHM_IORES_AIO_WAIT:
-// //std::cout << "w" << std::flush;
-// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
-// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
-// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-// jc->get_wr_events();
-// break;
-// case rhm::journal::RHM_IORES_BUSY:
-// //std::cout << "b" << std::flush;
-// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
-// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
-// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
-// break;
-// case rhm::journal::RHM_IORES_FULL:
-// std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
-// THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
-// break;
-// default:
-// assert("Store Error: Unexpected msg state");
-// }
-// }
}
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
[View Less]
17 years, 3 months
rhmessaging commits: r1495 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-14 14:41:15 -0500 (Fri, 14 Dec 2007)
New Revision: 1495
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/StoreException.h
Log:
Fix for BZ423981, but needs testing. Added monitor to JournalImpl.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 …
[View More]16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 19:41:15 UTC (rev 1495)
@@ -33,7 +33,6 @@
#include "StringDbt.h"
#include "JournalImpl.h"
#include "DataTokenImpl.h"
-#include <boost/intrusive_ptr.hpp>
using namespace rhm::bdbstore;
using namespace qpid::broker;
@@ -996,60 +995,75 @@
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
-
- bool written = false;
- unsigned aio_sleep_cnt = 0;
- unsigned busy_sleep_cnt = 0;
- while (!written)
- {
- JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres;
- if (txn->getXid().empty()){
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
- }else {
- eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
- }
- }else {
- if (message->isContentReleased()){
- eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
- } else {
- eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
- }
+
+ JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+ if (txn->getXid().empty()){
+ if (message->isContentReleased()){
+ jc->enqueue_extern_data_record(size, dtokp.get(), false);
+ } else {
+ jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
}
- switch (eres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //std::cout << "." << std::flush;
- if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
- written = true;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- //std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
- //std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_ENQCAPTHRESH:
- std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
- break;
- default:
- assert("Store Error: Unexpected msg state");
+ }else {
+ if (message->isContentReleased()){
+ jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+ } else {
+ jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
}
}
+
+// bool written = false;
+// unsigned aio_sleep_cnt = 0;
+// unsigned busy_sleep_cnt = 0;
+// while (!written)
+// {
+// JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
+// rhm::journal::iores eres;
+// if (txn->getXid().empty()){
+// if (message->isContentReleased()){
+// eres = jc->enqueue_extern_data_record(size, dtokp.get(), false);
+// }else {
+// eres = jc->enqueue_data_record(buff, size, size, dtokp.get(), false);
+// }
+// }else {
+// if (message->isContentReleased()){
+// eres = jc->enqueue_extern_txn_data_record(size, dtokp.get(), txn->getXid(), false);
+// } else {
+// eres = jc->enqueue_txn_data_record(buff, size, size, dtokp.get(), txn->getXid(), false);
+// }
+// }
+// switch (eres)
+// {
+// case rhm::journal::RHM_IORES_SUCCESS:
+// //std::cout << "." << std::flush;
+// if (dtokp.get()->wstate() >= DataTokenImpl::ENQ_SUBM)
+// written = true;
+// aio_sleep_cnt = 0;
+// busy_sleep_cnt = 0;
+// break;
+// case rhm::journal::RHM_IORES_AIO_WAIT:
+// //std::cout << "w" << std::flush;
+// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+// usleep(AIO_SLEEP_TIME); // TODO move sleep to wait for IO in get events
+// jc->get_wr_events();
+// break;
+// case rhm::journal::RHM_IORES_BUSY:
+// //std::cout << "b" << std::flush;
+// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// break;
+// case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+// std::cerr << "Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Enqueue capacity threshold exceeded on queue \"" + queue->getName() + "\".");
+// case rhm::journal::RHM_IORES_FULL:
+// std::cerr << "Enqueue: Error storing record -- Journal full on queue \"" << queue->getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Enqueue: Error storing record -- Journal full on queue \"" + queue->getName() + "\".");
+// break;
+// default:
+// assert("Store Error: Unexpected msg state");
+// }
+// }
} else {
/// cct message db
@@ -1127,7 +1141,6 @@
intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
- bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
ddtokp->setSourceMessage(msg);
@@ -1135,56 +1148,66 @@
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
string tid;
if (ctxt){
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
-
- unsigned aio_sleep_cnt = 0;
- unsigned busy_sleep_cnt = 0;
- while (!written)
- {
- rhm::journal::iores dres;
- try {
- if (tid.empty()){
- dres = jc->dequeue_data_record(ddtokp.get());
- } else {
- dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+ try {
+ JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
+ if (tid.empty()) {
+ jc->dequeue_data_record(ddtokp.get());
+ } else {
+ jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- switch (dres)
- {
- case rhm::journal::RHM_IORES_SUCCESS:
- //std::cout << "." << std::flush;
- aio_sleep_cnt = 0;
- busy_sleep_cnt = 0;
- written = true;
- break;
- case rhm::journal::RHM_IORES_AIO_WAIT:
- //std::cout << "w" << std::flush;
- if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- jc->get_wr_events();
- break;
- case rhm::journal::RHM_IORES_BUSY:
- //std::cout << "b" << std::flush;
- if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
- THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
- usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
- break;
- case rhm::journal::RHM_IORES_FULL:
- std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
- THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
- break;
- default:
- assert("Store Error: Unexpected msg state");
- }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
+
+// bool written = false;
+// unsigned aio_sleep_cnt = 0;
+// unsigned busy_sleep_cnt = 0;
+// while (!written)
+// {
+// rhm::journal::iores dres;
+// try {
+// if (tid.empty()){
+// dres = jc->dequeue_data_record(ddtokp.get());
+// } else {
+// dres = jc->dequeue_txn_data_record(ddtokp.get(), tid);
+// }
+// } catch (const journal::jexception& e) {
+// THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
+// }
+// switch (dres)
+// {
+// case rhm::journal::RHM_IORES_SUCCESS:
+// //std::cout << "." << std::flush;
+// aio_sleep_cnt = 0;
+// busy_sleep_cnt = 0;
+// written = true;
+// break;
+// case rhm::journal::RHM_IORES_AIO_WAIT:
+// //std::cout << "w" << std::flush;
+// if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for AIO: RHM_IORES_AIO_WAIT");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// jc->get_wr_events();
+// break;
+// case rhm::journal::RHM_IORES_BUSY:
+// //std::cout << "b" << std::flush;
+// if (++busy_sleep_cnt > MAX_AIO_SLEEPS)
+// THROW_STORE_EXCEPTION("Timeout waiting for mutex: RHM_IORES_BUSY");
+// usleep(AIO_SLEEP_TIME); // TODO add sleep time to get events call as option
+// break;
+// case rhm::journal::RHM_IORES_FULL:
+// std::cerr << "Dequeue: Error storing record -- Journal full on queue \"" << queue.getName() << "\"." << std::endl << std::flush;
+// THROW_STORE_FULL_EXCEPTION("Dequeue: Error storing record -- Journal full on queue \"" + queue.getName() + "\".");
+// break;
+// default:
+// assert("Store Error: Unexpected msg state");
+// }
+// }
}
bool BdbMessageStore::dequeue(DbTxn* txn, Dbt& messageId, Dbt& queueId)
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-12-14 19:41:15 UTC (rev 1495)
@@ -25,6 +25,7 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
+#include "StoreException.h"
#include <qpid/sys/Monitor.h>
using namespace rhm::bdbstore;
@@ -49,6 +50,7 @@
getEventsTimerSetFlag(false),
writeActivityFlag(false),
flushTriggeredFlag(true),
+ aioWait(false),
_xidp(0),
_datap(0),
_dlen(0),
@@ -168,69 +170,67 @@
return true;
}
-const iores
+void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
-//std::cout << " " << _jid << ":E" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_data_record(data_buff, tot_data_len,
- this_data_len, dtokp, transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp,
+ transient)));
}
-const iores
+void
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
const bool transient)
{
-//std::cout << " " << _jid << ":E-ext" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_extern_data_record(tot_data_len, dtokp,
- transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)));
}
-const iores
+void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, data_tok* dtokp, const std::string& xid,
- const bool transient)
+ const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
-//std::cout << " " << _jid << ":E-tx" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_txn_data_record(data_buff, tot_data_len,
- this_data_len, dtokp, xid, transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len,
+ dtokp, xid, transient)));
}
-const iores
+void
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
-//std::cout << " " << _jid << ":E-tx-ext" << std::flush;
- return handleInactivityTimer(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
- transient));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid,
+ transient)));
}
-const iores
+void
JournalImpl::dequeue_data_record(data_tok* const dtokp)
{
-//std::cout << " " << _jid << ":D" << std::flush;
- return handleInactivityTimer(jcntl::dequeue_data_record(dtokp));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::dequeue_data_record(dtokp)));
}
-const iores
+void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":D-tx" << std::flush;
- return handleInactivityTimer(jcntl::dequeue_txn_data_record(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid)));
}
-const iores
+void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":A-tx" << std::flush;
- return handleInactivityTimer(jcntl::txn_abort(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::txn_abort(dtokp, xid)));
}
-const iores
+void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
-//std::cout << " " << _jid << ":C-tx" << std::flush;
- return handleInactivityTimer(jcntl::txn_commit(dtokp, xid));
+ qpid::sys::ScopedLock<qpid::sys::Monitor> s(writeLock);
+ while(handleIoResult(jcntl::txn_commit(dtokp, xid)));
}
void
@@ -289,11 +289,42 @@
journalTimer.add(inactivityFireEventPtr);
}
-const iores
-JournalImpl::handleInactivityTimer(const iores r)
+const bool
+JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
- return r;
+ switch (r)
+ {
+ case rhm::journal::RHM_IORES_SUCCESS:
+ return false;
+ case rhm::journal::RHM_IORES_AIO_WAIT:
+std::cout << "W " << std::flush;
+ aioWait = true;
+ writeLock.wait();
+ usleep(1000);
+ return true;
+ case rhm::journal::RHM_IORES_ENQCAPTHRESH:
+ {
+ std::ostringstream oss;
+ oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ case rhm::journal::RHM_IORES_FULL:
+ {
+ std::ostringstream oss;
+ oss << "Journal full on queue \"" << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ default:
+ {
+ std::ostringstream oss;
+ oss << "Unexpected I/O response (" << rhm::journal::iores_str(r) << ") on queue " << _jid << "\".";
+ std::cerr << oss.str() << std::endl << std::flush;
+ THROW_STORE_FULL_EXCEPTION(oss.str());
+ }
+ }
}
void
@@ -329,6 +360,7 @@
dtokp->release();
this_dtok_list.pop_front();
}
+ jip->notifyWriteMonitor();
}
void
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-12-14 19:41:15 UTC (rev 1495)
@@ -74,6 +74,9 @@
bool writeActivityFlag;
bool flushTriggeredFlag;
qpid::intrusive_ptr<qpid::broker::TimerTask> inactivityFireEventPtr;
+
+ qpid::sys::Monitor writeLock;
+ bool aioWait;
// temp local vars for loadMsgContent below
void* _xidp;
@@ -118,29 +121,28 @@
size_t length);
// Overrides for write inactivity timer
- const journal::iores enqueue_data_record(const void* const data_buff,
- const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, journal::data_tok* dtokp,
const bool transient = false);
- const journal::iores enqueue_extern_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const bool transient = false);
+ void enqueue_extern_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
+ const bool transient = false);
- const journal::iores enqueue_txn_data_record(const void* const data_buff,
- const size_t tot_data_len, const size_t this_data_len, journal::data_tok* dtokp,
+ void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, journal::data_tok* dtokp, const std::string& xid,
+ const bool transient = false);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, journal::data_tok* dtokp,
const std::string& xid, const bool transient = false);
- const journal::iores enqueue_extern_txn_data_record(const size_t tot_data_len,
- journal::data_tok* dtokp, const std::string& xid, const bool transient = false);
+ void dequeue_data_record(journal::data_tok* const dtokp);
- const journal::iores dequeue_data_record(journal::data_tok* const dtokp);
+ void dequeue_txn_data_record(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores dequeue_txn_data_record(journal::data_tok* const dtokp,
- const std::string& xid);
+ void txn_abort(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores txn_abort(journal::data_tok* const dtokp, const std::string& xid);
+ void txn_commit(journal::data_tok* const dtokp, const std::string& xid);
- const journal::iores txn_commit(journal::data_tok* const dtokp, const std::string& xid);
-
void stop(bool block_till_aio_cmpl = false);
// Overrides for get_events timer
@@ -149,9 +151,12 @@
// TimerTask callback
void getEventsFire();
void flushFire();
+
+ // Notify write monitor
+ inline void notifyWriteMonitor() { if (aioWait) { aioWait = false; writeLock.notify(); } }
private:
- const journal::iores handleInactivityTimer(const journal::iores r);
+ const bool handleIoResult(const journal::iores r);
static void aio_wr_callback(jcntl* journal, u_int32_t num_dtoks);
static void aio_rd_callback(jcntl* journal, u_int32_t num_dtoks);
}; // class JournalImpl
Modified: store/trunk/cpp/lib/StoreException.h
===================================================================
--- store/trunk/cpp/lib/StoreException.h 2007-12-14 16:27:22 UTC (rev 1494)
+++ store/trunk/cpp/lib/StoreException.h 2007-12-14 19:41:15 UTC (rev 1495)
@@ -24,6 +24,9 @@
#ifndef _StoreException_
#define _StoreException_
+#include "IdDbt.h"
+#include <boost/format.hpp>
+
namespace rhm{
namespace bdbstore{
[View Less]
17 years, 3 months
rhmessaging commits: r1494 - in store/trunk/cpp: tests and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: gordonsim
Date: 2007-12-14 11:27:22 -0500 (Fri, 14 Dec 2007)
New Revision: 1494
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/tests/SimpleTest.cpp
Log:
pass in PersistebaleMessage by ref for destory as it is called when the message is being destructed and only the id is checked
remove the bdb enqueue/dequeue calls for released content in journal mode
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
==================…
[View More]=================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 16:06:50 UTC (rev 1493)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 16:27:22 UTC (rev 1494)
@@ -789,10 +789,10 @@
}
}
}
-void BdbMessageStore::destroy(intrusive_ptr<PersistableMessage>& msg)
+void BdbMessageStore::destroy(PersistableMessage& msg)
{
checkInit();
- u_int64_t messageId (msg->getPersistenceId());
+ u_int64_t messageId (msg.getPersistenceId());
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
TxnCtxt txn;
@@ -954,7 +954,6 @@
if (usingJrnl()){
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
- if (msg->isContentReleased()) put(mappingDb, txn->get(), key, value); // TODO - remove once jrnl is used for transient policy see **
}else{
msg->enqueueComplete(); // set enqueued for ack
put(mappingDb, txn->get(), key, value);
@@ -1095,13 +1094,6 @@
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
- // added here as we are not doing it async on call back
- if (msg->isContentReleased()) // TODO remove this code once jrnl is used for transient policy see **
- {
- Dbt key (&messageId, sizeof(messageId));
- Dbt value (&queueId, sizeof(queueId));
- dequeue(txn->get(), key, value);
- }
msg->dequeueComplete();
// if ( msg->isDequeueComplete() ) // clear id after last dequeue
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-14 16:06:50 UTC (rev 1493)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-14 16:27:22 UTC (rev 1494)
@@ -164,7 +164,7 @@
void recover(qpid::broker::RecoveryManager& queues);
void stage(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
- void destroy(boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
+ void destroy(qpid::broker::PersistableMessage& msg);
void appendContent(boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg, const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2007-12-14 16:06:50 UTC (rev 1493)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2007-12-14 16:27:22 UTC (rev 1494)
@@ -435,7 +435,7 @@
MessageUtils::addContent(msg, data);
store.stage(pmsg);
- store.destroy(pmsg);
+ store.destroy(*pmsg);
try {
string loaded;
@@ -464,7 +464,7 @@
store.create(queue);
store.enqueue(0, pmsg, queue);
- store.destroy(pmsg);
+ store.destroy(*pmsg);
string loaded;
store.loadContent(queue, cpmsg, loaded, 0, data.length());
[View Less]
17 years, 3 months
rhmessaging commits: r1493 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 11:06:50 -0500 (Fri, 14 Dec 2007)
New Revision: 1493
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/cumin/python/cumin/model.py
mgmt/notes/justin-todo.txt
Log:
Fixes client ajax.
Makes xml generation from ui metadata simpler.
Makes names in client statistics history consistent.
Fixes a crashing chart.
Modified: mgmt/cumin/python/cumin/client.py
=============================================================…
[View More]======
--- mgmt/cumin/python/cumin/client.py 2007-12-14 15:49:19 UTC (rev 1492)
+++ mgmt/cumin/python/cumin/client.py 2007-12-14 16:06:50 UTC (rev 1493)
@@ -189,11 +189,11 @@
def get_title(self, session, client):
return "History"
- def render_produced_chart_url(self, session, client):
+ def render_sent_chart_url(self, session, client):
return "client.png?id=%i;s=framesFromClient;s=bytesFromClient" \
% client.id
- def render_consumed_chart_url(self, session, client):
+ def render_received_chart_url(self, session, client):
return "client.png?id=%i;s=framesToClient;s=bytesToClient" \
% client.id
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-14 15:49:19 UTC (rev 1492)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-14 16:06:50 UTC (rev 1493)
@@ -138,19 +138,19 @@
</script>
[StatisticsHistory.html]
-<h2>Produced</h2>
+<h2>Sent</h2>
<div class="iblock chart">
- <img id="{id}.produced" src="{produced_chart_url}"/>
+ <img id="{id}.sent" src="{sent_chart_url}"/>
</div>
-<h2>Consumed</h2>
+<h2>Received</h2>
<div class="iblock chart">
- <img id="{id}.consumed" src="{consumed_chart_url}"/>
+ <img id="{id}.received" src="{received_chart_url}"/>
</div>
<script>
- cumin.client.listeners["{id}.produced"] = updateImage
- cumin.client.listeners["{id}.consumed"] = updateImage
+ cumin.client.listeners["{id}.sent"] = updateImage
+ cumin.client.listeners["{id}.received"] = updateImage
</script>
[ClientSessionSet.html]
Modified: mgmt/cumin/python/cumin/model.py
===================================================================
--- mgmt/cumin/python/cumin/model.py 2007-12-14 15:49:19 UTC (rev 1492)
+++ mgmt/cumin/python/cumin/model.py 2007-12-14 16:06:50 UTC (rev 1493)
@@ -40,6 +40,9 @@
if stat.name == name:
return stat
+ def get_object_name(self, object):
+ return object.name
+
def write_event_xml(self, object, writer):
writer.write("<events errors=\"%i\" warnings=\"%i\"/>" % (0, 0))
@@ -51,6 +54,16 @@
for stat in self.stats:
stat.write_xml(object, writer)
+ def write_xml(self, object, writer):
+ writer.write("<%s id=\"%i\" name=\"%s\">" % \
+ (self.name, object.id, self.get_object_name(object)))
+
+ self.write_event_xml(object, writer)
+ self.write_action_xml(object, writer)
+ self.write_stat_xml(object, writer)
+
+ writer.write("</%s>" % self.name)
+
class CuminStat(object):
def __init__(self, cls, name, type):
self.model = cls.model
@@ -84,7 +97,7 @@
for stat in stats:
time = getattr(stat, "recTime")
- value = getattr(stat, self.name)
+ value = getattr(stat, self.name, 0)
samples.append((time, value))
return samples
@@ -111,7 +124,7 @@
def write_xml(self, object, writer):
writer.write("<stat name=\"%s\" value=\"%i\" rate=\"%i\"/>" \
% (self.name,
- self.value(object) or 0,
+ self.value(object) or 0, #XXX need a null value
self.rate(object) or 0))
class CuminQueue(CuminClass):
@@ -270,15 +283,6 @@
stat.unit = "byte"
stat.categories = ("byte", "persistent")
- def write_xml(self, queue, writer):
- writer.write("<queue id=\"%i\" name=\"%s\">" % (queue.id, queue.name))
-
- self.write_event_xml(queue, writer)
- self.write_action_xml(queue, writer)
- self.write_stat_xml(queue, writer)
-
- writer.write("</queue>")
-
class CuminExchange(CuminClass):
def __init__(self, model):
super(CuminExchange, self).__init__(model, "exchange", Exchange)
@@ -327,16 +331,6 @@
stat.unit = "message"
stat.categories = ("general")
- def write_xml(self, exchange, writer):
- writer.write("<exchange id=\"%i\" name=\"%s\">" % \
- (exchange.id, exchange.name))
-
- self.write_event_xml(exchange, writer)
- self.write_action_xml(exchange, writer)
- self.write_stat_xml(exchange, writer)
-
- writer.write("</exchange>")
-
class CuminBinding(CuminClass):
def __init__(self, model):
super(CuminBinding, self).__init__(model, "binding", Binding)
@@ -348,6 +342,9 @@
stat.unit = "message"
stat.categories = ("general")
+ def get_object_name(self, binding):
+ return ""
+
class CuminClient(CuminClass):
def __init__(self, model):
super(CuminClient, self).__init__(model, "client", Client)
@@ -374,6 +371,9 @@
stat.unit = "frame"
stat.categories = ("general")
+ def get_object_name(self, client):
+ return client.address
+
class CuminSession(CuminClass):
def __init__(self, model):
super(CuminSession, self).__init__(model, "session", Session)
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-14 15:49:19 UTC (rev 1492)
+++ mgmt/notes/justin-todo.txt 2007-12-14 16:06:50 UTC (rev 1493)
@@ -42,8 +42,6 @@
- Need to handle exceptions in broker connect thread, so it doesn't
stop trying
- * Fix client ajax
-
Deferred
* Add an edit form for broker registrations so you can change their
[View Less]
17 years, 3 months
rhmessaging commits: r1492 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 10:49:19 -0500 (Fri, 14 Dec 2007)
New Revision: 1492
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/client.strings
mgmt/notes/justin-todo.txt
Log:
Removes auth id from client properties, as we have no data for it.
Modified: mgmt/cumin/python/cumin/client.py
===================================================================
--- mgmt/cumin/python/cumin/client.py 2007-12-14 15:46:56 UTC (rev 1491)
+++ mgmt/cumin/python/cumin/client.…
[View More]py 2007-12-14 15:49:19 UTC (rev 1492)
@@ -154,9 +154,6 @@
def render_address(self, session, client):
return client.address
- def render_auth_id(self, session, client):
- return "e50e7dcaa8d6a039a" #XXX get rid of this
-
def render_created_deleted(self, session, client):
return "%s – %s" % (fmt_datetime(client.creationTime),
fmt_datetime(client.deletionTime))
Modified: mgmt/cumin/python/cumin/client.strings
===================================================================
--- mgmt/cumin/python/cumin/client.strings 2007-12-14 15:46:56 UTC (rev 1491)
+++ mgmt/cumin/python/cumin/client.strings 2007-12-14 15:49:19 UTC (rev 1492)
@@ -106,7 +106,6 @@
<table class="props">
<tr><th>Address</th><td>{address}</td></tr>
- <tr><th>Auth. ID</th><td>{auth_id}</td></tr>
<tr><th>Created – Deleted</th><td>{created_deleted}</td></tr>
<tr><th>Updated</th><td>{updated}</td></tr>
<tr>
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-14 15:46:56 UTC (rev 1491)
+++ mgmt/notes/justin-todo.txt 2007-12-14 15:49:19 UTC (rev 1492)
@@ -44,8 +44,6 @@
* Fix client ajax
- * Remove auth id for now
-
Deferred
* Add an edit form for broker registrations so you can change their
[View Less]
17 years, 3 months
rhmessaging commits: r1491 - mgmt/notes.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 10:46:56 -0500 (Fri, 14 Dec 2007)
New Revision: 1491
Modified:
mgmt/notes/justin-todo.txt
Log:
Todo updates.
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-14 15:37:46 UTC (rev 1490)
+++ mgmt/notes/justin-todo.txt 2007-12-14 15:46:56 UTC (rev 1491)
@@ -1,31 +1,47 @@
Current
- * "purge messages from queues"
+ * Bulk actions
- * Add legends to charts
+ - …
[View More]Add javascript for the check-all behavior
- * Add a groups column to the browser broker list
+ - "purge messages from queues"
+ - "unregister brokers"
+
+ * Improve charts
+
+ - Add legends to charts
+
+ - Change the way we plot values; currently it's very stupid
+
+ - Display current values on right-side and according to color
+
+ - Add y-axis ticks and values for reference
+
+ * Broker groups
+
+ - Add a groups column to the browser broker list
+
+ - Fix initial group box in broker register form, or remove it
+
+ - Group form submit has different behaviors between hitting enter
+ and clicking submit
+
* Sort in tables
* Render stats without values as something other than 0, say a --
- * Add javascript for the check-all behavior
+ * Mgmtd-broker interaction
- * Deal with problem of calling method on broker that is not there
+ - Deal with problem of calling method on broker that is not there
- * Only put something in pending actions if the call succeeds
+ - Only put something in pending actions if the call succeeds
- * Fix initial group box in broker register form, or remove it
+ - Handle other exception conditions on broker connect more gracefully
- * Handle other exception conditions on broker connect more gracefully
+ - Need to handle exceptions in broker connect thread, so it doesn't
+ stop trying
- * Need to handle exceptions in broker connect thread, so it doesn't
- stop trying
-
- * Group form submit has different behaviors between hitting enter and
- clicking submit
-
* Fix client ajax
* Remove auth id for now
[View Less]
17 years, 3 months
rhmessaging commits: r1490 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 10:37:46 -0500 (Fri, 14 Dec 2007)
New Revision: 1490
Modified:
mgmt/cumin/python/cumin/client.py
mgmt/cumin/python/cumin/widgets.py
mgmt/cumin/python/cumin/widgets.strings
mgmt/notes/justin-todo.txt
Log:
Adds a new widget StateSwitch, for producing unit radio buttons of
various sorts. Reimplements UnitSwitch using it.
Adds a frames-vs-bytes state switch to ClientSet.
Modified: mgmt/cumin/python/cumin/client.py
====================================…
[View More]===============================
--- mgmt/cumin/python/cumin/client.py 2007-12-14 14:54:01 UTC (rev 1489)
+++ mgmt/cumin/python/cumin/client.py 2007-12-14 15:37:46 UTC (rev 1490)
@@ -13,14 +13,16 @@
def __init__(self, app, name):
super(ClientSet, self).__init__(app, name)
- self.unit = UnitSwitch(app, "unit")
+ self.unit = StateSwitch(app, "unit")
+ self.unit.add_state("f", "Frames")
+ self.unit.add_state("b", "Bytes")
self.add_child(self.unit)
def get_title(self, session, vhost):
return "Clients %s" % fmt_count(self.get_item_count(session, vhost))
def render_unit_plural(self, session, vhost):
- return self.unit.get(session) == "b" and "Bytes" or "Msgs."
+ return self.unit.get(session) == "b" and "Bytes" or "Frames"
def get_item_count(self, session, vhost):
return Client.select(Client.q.vhostID == vhost.id).count()
Modified: mgmt/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/cumin/python/cumin/widgets.py 2007-12-14 14:54:01 UTC (rev 1489)
+++ mgmt/cumin/python/cumin/widgets.py 2007-12-14 15:37:46 UTC (rev 1490)
@@ -240,35 +240,47 @@
return writer.to_string()
-class UnitSwitch(Widget):
+class StateSwitch(ItemSet):
def __init__(self, app, name):
- super(UnitSwitch, self).__init__(app, name)
+ super(StateSwitch, self).__init__(app, name)
self.param = Parameter(app, "param")
- self.param.set_default("m")
self.add_parameter(self.param)
+ self.states = list()
+ self.titles = dict()
+
+ def add_state(self, state, title):
+ self.states.append(state)
+ self.titles[state] = title
+
+ if self.param.default is None:
+ self.param.set_default(state)
+
def get(self, session):
return self.param.get(session)
def set(self, session, value):
return self.param.set(session, value)
- def render_messages_link(self, session, vhost):
+ def get_items(self, session, object):
+ return self.states
+
+ def render_item_link(self, session, state):
branch = session.branch()
- self.set(branch, "m")
+ self.set(branch, state)
- class_ = self.get(session) == "m" and "selected"
+ title = self.titles[state]
+ class_ = self.get(session) == state and "selected"
- return fmt_link(branch.marshal(), "Messages", class_)
+ return fmt_link(branch.marshal(), title, class_)
- def render_bytes_link(self, session, vhost):
- branch = session.branch()
- self.set(branch, "b")
+class UnitSwitch(StateSwitch):
+ def __init__(self, app, name):
+ super(UnitSwitch, self).__init__(app, name)
- class_ = self.get(session) == "b" and "selected"
-
- return fmt_link(branch.marshal(), "Bytes", class_)
+ self.add_state("m", "Messages")
+ self.add_state("b", "Bytes")
class Paginator(ItemSet):
def __init__(self, app, name):
Modified: mgmt/cumin/python/cumin/widgets.strings
===================================================================
--- mgmt/cumin/python/cumin/widgets.strings 2007-12-14 14:54:01 UTC (rev 1489)
+++ mgmt/cumin/python/cumin/widgets.strings 2007-12-14 15:37:46 UTC (rev 1490)
@@ -52,12 +52,14 @@
<div>{actions_pending}</div>
</div>
-[UnitSwitch.html]
+[StateSwitch.html]
<ul class="radiotabs">
- <li>{messages_link}</li>
- <li>{bytes_link}</li>
+ {items}
</ul>
+[StateSwitch.item_html]
+<li>{item_link}</li>
+
[Paginator.css]
div.Paginator {
margin: 0 0 0.5em 0;
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-14 14:54:01 UTC (rev 1489)
+++ mgmt/notes/justin-todo.txt 2007-12-14 15:37:46 UTC (rev 1490)
@@ -30,8 +30,6 @@
* Remove auth id for now
- * Unit for clients should be frames vs. bytes
-
Deferred
* Add an edit form for broker registrations so you can change their
[View Less]
17 years, 3 months
rhmessaging commits: r1489 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 09:54:01 -0500 (Fri, 14 Dec 2007)
New Revision: 1489
Modified:
mgmt/cumin/python/cumin/broker.py
Log:
Use broker registration name, not port.
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-12-14 14:47:46 UTC (rev 1488)
+++ mgmt/cumin/python/cumin/broker.py 2007-12-14 14:54:01 UTC (rev 1489)
@@ -94,7 +94,7 @@
def render_item_link(self, session, …
[View More]broker):
branch = session.branch()
self.page().show_broker(branch, broker).show_view(branch)
- return fmt_olink(branch, broker, name=broker.port)
+ return fmt_olink(branch, broker)
def render_item_group_link(self, session, broker):
group = None #broker.get_broker_group()
[View Less]
17 years, 3 months
rhmessaging commits: r1488 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-12-14 09:47:46 -0500 (Fri, 14 Dec 2007)
New Revision: 1488
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Undid last checkin's mutex, this way kills performance as the lock is across all queues.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-14 14:44:56 UTC (rev 1487)
+++ store/trunk/cpp/lib/…
[View More]BdbMessageStore.cpp 2007-12-14 14:47:46 UTC (rev 1488)
@@ -992,7 +992,6 @@
try {
if ( queue && usingJrnl()) {
- qpid::sys::Mutex::ScopedLock s(jrnlWriteLock);
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
@@ -1136,7 +1135,6 @@
intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue)
{
- qpid::sys::Mutex::ScopedLock s(jrnlWriteLock);
bool written = false;
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->addRef();
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-12-14 14:44:56 UTC (rev 1487)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-12-14 14:47:46 UTC (rev 1488)
@@ -84,7 +84,6 @@
u_int16_t numJrnlFiles;
u_int32_t jrnlFsizePgs;
bool isInit;
- mutable qpid::sys::Mutex jrnlWriteLock;
const char* envPath;
static qpid::sys::Duration defJournalGetEventsTimeout;
static qpid::sys::Duration defJournalFlushTimeout;
[View Less]
17 years, 3 months
rhmessaging commits: r1487 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-12-14 09:44:56 -0500 (Fri, 14 Dec 2007)
New Revision: 1487
Modified:
mgmt/cumin/python/cumin/__init__.py
mgmt/cumin/python/cumin/broker.py
mgmt/notes/justin-todo.txt
Log:
Fixes a problem where the broker registration form would hang trying
to connect to added brokers. Now it simply stores the new
registrations and tells the broker connect thread that there have been
updates.
Modified: mgmt/cumin/python/cumin/__init__.py
====================================…
[View More]===============================
--- mgmt/cumin/python/cumin/__init__.py 2007-12-14 14:05:58 UTC (rev 1486)
+++ mgmt/cumin/python/cumin/__init__.py 2007-12-14 14:44:56 UTC (rev 1487)
@@ -9,7 +9,7 @@
from mint import *
from sqlobject.main import *
from time import sleep
-from threading import Thread
+from threading import Thread, Event
from model import CuminModel
from demo import DemoData
@@ -31,7 +31,8 @@
self.model = CuminModel()
- BrokerConnectThread(self.model).start()
+ self.broker_connect_thread = BrokerConnectThread(self.model)
+ self.broker_connect_thread.start()
self.cumin_page = CuminPage(self, "cumin.html")
self.set_default_page(self.cumin_page)
@@ -61,6 +62,12 @@
self.model = model
self.setDaemon(True)
+ self.event = Event()
+
+ def prompt(self):
+ self.event.set()
+ self.event.clear()
+
def run(self):
while True:
for reg in BrokerRegistration.select():
@@ -78,4 +85,4 @@
except socket.error:
print "Connection failed"
- sleep(15)
+ self.event.wait(30)
Modified: mgmt/cumin/python/cumin/broker.py
===================================================================
--- mgmt/cumin/python/cumin/broker.py 2007-12-14 14:05:58 UTC (rev 1486)
+++ mgmt/cumin/python/cumin/broker.py 2007-12-14 14:44:56 UTC (rev 1487)
@@ -665,7 +665,7 @@
reg.name = names[i]
- self.app.model.data.connectToBroker(reg.host, reg.port or 5672)
+ self.app.broker_connect_thread.prompt()
self.process_cancel(session, model)
Modified: mgmt/notes/justin-todo.txt
===================================================================
--- mgmt/notes/justin-todo.txt 2007-12-14 14:05:58 UTC (rev 1486)
+++ mgmt/notes/justin-todo.txt 2007-12-14 14:44:56 UTC (rev 1487)
@@ -23,8 +23,6 @@
* Need to handle exceptions in broker connect thread, so it doesn't
stop trying
- * Fix session and client naming
-
* Group form submit has different behaviors between hitting enter and
clicking submit
@@ -36,6 +34,9 @@
Deferred
+ * Add an edit form for broker registrations so you can change their
+ names
+
* Add inactive state to some status lights
* Paginate producers
[View Less]
17 years, 3 months