[rhmessaging-commits] rhmessaging commits: r1424 - store/trunk/cpp/lib.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue Dec 4 15:22:40 EST 2007
Author: gordonsim
Date: 2007-12-04 15:22:39 -0500 (Tue, 04 Dec 2007)
New Revision: 1424
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/TxnCtxt.h
Log:
Hack to serialise all bdb store transactions to avoid deadlocking.
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-04 19:30:24 UTC (rev 1423)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-12-04 20:22:39 UTC (rev 1424)
@@ -84,7 +84,7 @@
try {
env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
- txn.begin(env);
+ txn.begin(env, false);
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
@@ -291,7 +291,7 @@
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
put(bindingDb, txn.get(), key, value);
txn.commit();
}
@@ -304,7 +304,7 @@
BindingDbt value(e, q, k, a);
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
if (deleteKeyValuePair(bindingDb, txn.get(), key, value)) {
txn.commit();
@@ -325,7 +325,7 @@
message_index messages;//id->message
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, false);
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
@@ -729,7 +729,7 @@
{
checkInit();
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
u_int64_t messageId (msg->getPersistenceId());
if (messageId == 0 || !msg->isContentReleased()) {
@@ -752,7 +752,7 @@
if (messageId) {
Dbt key (&messageId, sizeof(messageId));
TxnCtxt txn;
- txn.begin(env);
+ txn.begin(env, true);
try {
deleteIfUnused(txn.get(), key);
txn.commit();
@@ -881,7 +881,7 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
+ txn->begin(env, !usingJrnl());
}
try {
@@ -1026,7 +1026,7 @@
txn = check(ctxt);
} else {
txn = &implicit;
- txn->begin(env);
+ txn->begin(env, !usingJrnl());
}
try {
@@ -1211,27 +1211,27 @@
void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
{
- if (!txn.get()) txn.begin(env);
+ if (!txn.get()) txn.begin(env, !usingJrnl());
try {
StringDbt key(txn.getXid());
if (!usingJrnl()){
- //scroll through all records matching xid in apply and dequeue
- //using the message and queue id encoded in each value
- Cursor c;
- c.open(apply, txn.get());
- IdPairDbt value;
-
- for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
+ //scroll through all records matching xid in apply and dequeue
+ //using the message and queue id encoded in each value
+ Cursor c;
+ c.open(apply, txn.get());
+ IdPairDbt value;
+
+ for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
dequeue(txn.get(), value.message, value.queue);
- }
- c.close();
-
- //delete all records matching xid
- discard.del(txn.get(), &key, 0);
- apply.del(txn.get(), &key, 0);
- }
+ }
+ c.close();
+
+ //delete all records matching xid
+ discard.del(txn.get(), &key, 0);
+ apply.del(txn.get(), &key, 0);
+ }
prepareXidDb.del(txn.get(), &key, 0);
txn.complete(commit);
@@ -1247,7 +1247,7 @@
checkInit();
// pass sequence number for c/a when using jrnl
TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
- txn->begin(env);
+ txn->begin(env, !usingJrnl());
return auto_ptr<TransactionContext>(txn);
}
@@ -1259,7 +1259,7 @@
// pass sequence number for c/a when using jrnl
TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
- txn->begin(env);
+ txn->begin(env, !usingJrnl());
return auto_ptr<TPCTransactionContext>(txn);
}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2007-12-04 19:30:24 UTC (rev 1423)
+++ store/trunk/cpp/lib/TxnCtxt.h 2007-12-04 20:22:39 UTC (rev 1424)
@@ -28,6 +28,7 @@
#include <qpid/broker/MessageStore.h>
#include <qpid/sys/Mutex.h>
#include <boost/shared_ptr.hpp>
+#include <memory>
#include <vector>
#include "JournalImpl.h"
#include "DataTokenImpl.h"
@@ -45,97 +46,105 @@
class TxnCtxt : public qpid::broker::TransactionContext
{
protected:
- typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
- ipqdef impactedQueues; // list of Queues used in the txn
+ typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+ typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
+
+ static qpid::sys::Mutex globalSerialiser;
+
+ ipqdef impactedQueues; // list of Queues used in the txn
static unsigned int count;
mutable qpid::sys::Mutex Lock;
- IdSequence* loggedtx;
-
- unsigned int getCount() {
+ IdSequence* loggedtx;
+ AutoScopedLock globalHolder;
+
+ unsigned int getCount() {
qpid::sys::Mutex::ScopedLock locker(Lock);
- return ++count;
- }
- /**
- * local txn id, if non XA.
- */
- std::string tid;
- DbTxn* txn;
-
- void completeTXN(bool commit){
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->ref();
- dtokp->set_rid(loggedtx->next());
- try{
- if (commit)
- jc->txn_commit(dtokp.get(), getXid());
- else
- {
- jc->txn_abort(dtokp.get(), getXid());
+ return ++count;
+ }
+ /**
+ * local txn id, if non XA.
+ */
+ std::string tid;
+ DbTxn* txn;
+
+ void completeTXN(bool commit){
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->ref();
+ dtokp->set_rid(loggedtx->next());
+ try{
+ if (commit)
+ jc->txn_commit(dtokp.get(), getXid());
+ else
+ {
+ jc->txn_abort(dtokp.get(), getXid());
}
- } catch (const journal::jexception& e) {
-//std::cout << "Error commit" << e << std::endl;
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
- }
-
- }
- }
- sync();
- deleteXidRecord();
- }
-
+ } catch (const journal::jexception& e) {
+ //std::cout << "Error commit" << e << std::endl;
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ }
+
+ }
+ }
+ sync();
+ deleteXidRecord();
+ }
+
public:
TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0) {
- if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
- }
+ if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
+ }
- /**
- * Call to make sure all the data for this txn is written to safe store
- *
- *@return if the data sucessfully synced.
- */
- void sync(){
- bool allWritten = false;
- bool firstloop = true;
- while (loggedtx && !allWritten){
+ /**
+ * Call to make sure all the data for this txn is written to safe store
+ *
+ *@return if the data sucessfully synced.
+ */
+ void sync(){
+ bool allWritten = false;
+ bool firstloop = true;
+ while (loggedtx && !allWritten){
if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
- allWritten = true;
- for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- JournalImpl* jc = static_cast<JournalImpl*>(*i);
-
- try
- {
- if (jc && !(jc->is_txn_synced(getXid())))
- {
- if (firstloop)
- jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
-//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
- }
- firstloop = false;
- }
- }
-
+ allWritten = true;
+ for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ JournalImpl* jc = static_cast<JournalImpl*>(*i);
+
+ try
+ {
+ if (jc && !(jc->is_txn_synced(getXid())))
+ {
+ if (firstloop)
+ jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+ }
+ firstloop = false;
+ }
+ }
+
virtual ~TxnCtxt() { if(txn) abort(); }
- void begin(DbEnv& env){ env.txn_begin(0, &txn, 0); }
- void commit(){ txn->commit(0); completeTXN(true); txn = 0; }
- void abort(){ txn->abort(); completeTXN(false); txn = 0; }
+ void begin(DbEnv& env, bool sync = false){
+ env.txn_begin(0, &txn, 0);
+ if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+ }
+ void commit(){ txn->commit(0); completeTXN(true); txn = 0; globalHolder.reset(); }
+ void abort(){ txn->abort(); completeTXN(false); txn = 0; globalHolder.reset(); }
DbTxn* get(){ return txn; }
virtual bool isTPC() { return false; }
- virtual const std::string& getXid() { return tid; }
-
- void deleteXidRecord(){ impactedQueues.clear(); }
- void addXidRecord(qpid::broker::ExternalQueueStore* queue){
- impactedQueues.insert(queue); }
-
+ virtual const std::string& getXid() { return tid; }
+
+ void deleteXidRecord(){ impactedQueues.clear(); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
+ impactedQueues.insert(queue); }
+
};
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -146,12 +155,15 @@
virtual bool isTPC() { return true; }
virtual const std::string& getXid() { return xid; }
// commit the BDB abort, abort commit the jnrl
- void commit(){ txn->commit(0); txn = 0; }
- void abort(){ txn->abort(); txn = 0; }
+ void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
+ void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
void complete(bool commit){
- txn->commit(0); completeTXN(commit); txn = 0; }
+ txn->commit(0); completeTXN(commit); txn = 0;
+ }
};
+qpid::sys::Mutex TxnCtxt::globalSerialiser;
+
}}
#endif
More information about the rhmessaging-commits
mailing list