[rhmessaging-commits] rhmessaging commits: r1424 - store/trunk/cpp/lib.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Dec 4 15:22:40 EST 2007


Author: gordonsim
Date: 2007-12-04 15:22:39 -0500 (Tue, 04 Dec 2007)
New Revision: 1424

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/TxnCtxt.h
Log:
Hack to serialise all bdb store transactions to avoid deadlocking.



Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-04 19:30:24 UTC (rev 1423)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2007-12-04 20:22:39 UTC (rev 1424)
@@ -84,7 +84,7 @@
     try {
         env.open(bdbdir.c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON, 0);
 
-        txn.begin(env);
+        txn.begin(env, false);
         open(queueDb, txn.get(), "queues.db", false);
         open(configDb, txn.get(), "config.db", false);
         open(exchangeDb, txn.get(), "exchanges.db", false);
@@ -291,7 +291,7 @@
     IdDbt key(e.getPersistenceId());    
     BindingDbt value(e, q, k, a);
     TxnCtxt txn;
-    txn.begin(env);
+    txn.begin(env, true);
     put(bindingDb, txn.get(), key, value);
     txn.commit();
 }
@@ -304,7 +304,7 @@
     BindingDbt value(e, q, k, a);
 
     TxnCtxt txn;
-    txn.begin(env);
+    txn.begin(env, true);
 
     if (deleteKeyValuePair(bindingDb, txn.get(), key, value)) {
         txn.commit();
@@ -325,7 +325,7 @@
     message_index messages;//id->message
 
     TxnCtxt txn;
-    txn.begin(env);
+    txn.begin(env, false);
     try {
         //read all queues, calls recoversMessages
         recoverQueues(txn, registry, queues, prepared, messages);
@@ -729,7 +729,7 @@
 {
 	checkInit();
     TxnCtxt txn;
-    txn.begin(env);
+    txn.begin(env, true);
 
     u_int64_t messageId (msg->getPersistenceId());
     if (messageId == 0 || !msg->isContentReleased()) {
@@ -752,7 +752,7 @@
     if (messageId) {
         Dbt key (&messageId, sizeof(messageId));
         TxnCtxt txn;
-        txn.begin(env);
+        txn.begin(env, true);
         try {
             deleteIfUnused(txn.get(), key);
             txn.commit();
@@ -881,7 +881,7 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env);
+        txn->begin(env, !usingJrnl());
     }
 
     try {
@@ -1026,7 +1026,7 @@
         txn = check(ctxt);
     } else {
         txn = &implicit;
-        txn->begin(env);
+        txn->begin(env, !usingJrnl());
     }
     
     try {
@@ -1211,27 +1211,27 @@
 
 void BdbMessageStore::completed(TPCTxnCtxt& txn, Db& discard, Db& apply, bool commit)
 {
-    if (!txn.get()) txn.begin(env);
+    if (!txn.get()) txn.begin(env, !usingJrnl());
 
     try {
 
         StringDbt key(txn.getXid());
         if (!usingJrnl()){
-			//scroll through all records matching xid in apply and dequeue
-         	//using the message and queue id encoded in each value
-        	Cursor c;
-        	c.open(apply, txn.get());
-       		IdPairDbt value;
-
-        	for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
+            //scroll through all records matching xid in apply and dequeue
+            //using the message and queue id encoded in each value
+            Cursor c;
+            c.open(apply, txn.get());
+            IdPairDbt value;
+            
+            for (int status = c->get(&key, &value, DB_SET); status == 0; status = c->get(&key, &value, DB_NEXT_DUP)) {
             	dequeue(txn.get(), value.message, value.queue);
-        	}
-        	c.close();
-
-        	//delete all records matching xid
-        	discard.del(txn.get(), &key, 0);
-        	apply.del(txn.get(), &key, 0);
-		}
+            }
+            c.close();
+            
+            //delete all records matching xid
+            discard.del(txn.get(), &key, 0);
+            apply.del(txn.get(), &key, 0);
+        }
         prepareXidDb.del(txn.get(), &key, 0);
 
         txn.complete(commit);
@@ -1247,7 +1247,7 @@
 	checkInit();
 	// pass sequence number for c/a when using jrnl
     TxnCtxt* txn(new TxnCtxt(&messageIdSequence ));
-    txn->begin(env);
+    txn->begin(env, !usingJrnl());
     return auto_ptr<TransactionContext>(txn);
 }
 
@@ -1259,7 +1259,7 @@
 
 	// pass sequence number for c/a when using jrnl
     TPCTxnCtxt* txn(new TPCTxnCtxt(xid, jtx));
-    txn->begin(env);
+    txn->begin(env, !usingJrnl());
     return auto_ptr<TPCTransactionContext>(txn);
 }
 

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2007-12-04 19:30:24 UTC (rev 1423)
+++ store/trunk/cpp/lib/TxnCtxt.h	2007-12-04 20:22:39 UTC (rev 1424)
@@ -28,6 +28,7 @@
 #include <qpid/broker/MessageStore.h>
 #include <qpid/sys/Mutex.h>
 #include <boost/shared_ptr.hpp>
+#include <memory>
 #include <vector>
 #include "JournalImpl.h"
 #include "DataTokenImpl.h"
@@ -45,97 +46,105 @@
 class TxnCtxt : public qpid::broker::TransactionContext
 {
 protected:
-	typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
-	ipqdef impactedQueues; // list of Queues used in the txn
+    typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
+    typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
+
+    static qpid::sys::Mutex globalSerialiser;
+
+    ipqdef impactedQueues; // list of Queues used in the txn
     static unsigned int count;
     mutable qpid::sys::Mutex Lock;
-	IdSequence* loggedtx;
-	
-	unsigned int getCount() {
+    IdSequence* loggedtx;
+    AutoScopedLock globalHolder;
+    
+    unsigned int getCount() {
     	qpid::sys::Mutex::ScopedLock locker(Lock);
-		return ++count;
-	}
-	/**
-	* local txn id, if non XA.
-	*/
-	std::string tid;
-	DbTxn* txn;
-	
-	void completeTXN(bool commit){
-		for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
-	   		JournalImpl* jc = static_cast<JournalImpl*>(*i);
-			if (jc && loggedtx) { /* if using journal */
-        		boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
-				dtokp->ref();
-				dtokp->set_rid(loggedtx->next());
-				try{
-					if (commit)
-						jc->txn_commit(dtokp.get(), getXid());
-					else
-					{
-						jc->txn_abort(dtokp.get(), getXid());
+        return ++count;
+    }
+    /**
+     * local txn id, if non XA.
+     */
+    std::string tid;
+    DbTxn* txn;
+    
+    void completeTXN(bool commit){
+        for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
+            JournalImpl* jc = static_cast<JournalImpl*>(*i);
+            if (jc && loggedtx) { /* if using journal */
+                boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+                dtokp->ref();
+                dtokp->set_rid(loggedtx->next());
+                try{
+                    if (commit)
+                        jc->txn_commit(dtokp.get(), getXid());
+                    else
+                        {
+                            jc->txn_abort(dtokp.get(), getXid());
                 	}
-				} catch (const journal::jexception& e) { 
-//std::cout << "Error commit" << e << std::endl;
-			  		THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
-	     		}
-			
-			}
-		}	
-		sync();
-		deleteXidRecord();
-	}
-	
+                } catch (const journal::jexception& e) { 
+                    //std::cout << "Error commit" << e << std::endl;
+                    THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+                }
+		
+            }
+        }	
+        sync();
+        deleteXidRecord();
+    }
+    
 public:
 	
     TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), txn(0)  {
-		if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
-	}
+        if (loggedtx){ tid.assign( "rhm-tid"); tid+=getCount(); }
+    }
 	
-	/**
-	* Call to make sure all the data for this txn is written to safe store
-	*
-	*@return if the data sucessfully synced.
-	*/	
-	void sync(){
-  		bool allWritten = false;
-		bool firstloop = true;
-		while (loggedtx && !allWritten){
+    /**
+     * Call to make sure all the data for this txn is written to safe store
+     *
+     *@return if the data sucessfully synced.
+     */	
+    void sync(){
+        bool allWritten = false;
+        bool firstloop = true;
+        while (loggedtx && !allWritten){
             if (!firstloop) ::usleep(AIO_SLEEP_TIME); // move this into the get events call aiolib..
-			allWritten = true;
-			for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
-	           	JournalImpl* jc = static_cast<JournalImpl*>(*i);
-				
-				try
-				{
-					if (jc && !(jc->is_txn_synced(getXid())))
-					{
-						if (firstloop)
-							jc->flush();
-						allWritten = false;
-						jc->get_wr_events();
-					}
-				} catch (const journal::jexception& e) { 
-//std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
-			  		THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
-	     		}
-			}
-			firstloop = false;
-		}
-	}
-	
+            allWritten = true;
+            for (TxnCtxt::ipqdef::iterator i = impactedQueues.begin(); i != impactedQueues.end(); i++) { 
+                JournalImpl* jc = static_cast<JournalImpl*>(*i);
+		
+                try
+                    {
+                        if (jc && !(jc->is_txn_synced(getXid())))
+                            {
+                                if (firstloop)
+                                    jc->flush();
+                                allWritten = false;
+                                jc->get_wr_events();
+                            }
+                    } catch (const journal::jexception& e) { 
+                    //std::cout << " TxnCtxt: Error sync 3" << e << std::flush;
+                    THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+                }
+            }
+            firstloop = false;
+        }
+    }
+    
     virtual ~TxnCtxt() { if(txn) abort(); }
-    void begin(DbEnv& env){ env.txn_begin(0, &txn, 0); }
-    void commit(){ txn->commit(0); completeTXN(true); txn = 0; }
-    void abort(){ txn->abort(); completeTXN(false); txn = 0; }
+    void begin(DbEnv& env, bool sync = false){ 
+        env.txn_begin(0, &txn, 0);  
+        if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); 
+    }
+    void commit(){ txn->commit(0); completeTXN(true); txn = 0; globalHolder.reset(); }
+    void abort(){ txn->abort(); completeTXN(false); txn = 0; globalHolder.reset(); }
     DbTxn* get(){ return txn; }
     virtual bool isTPC() { return false; }
-	virtual const std::string& getXid() { return tid; }
-
-	void deleteXidRecord(){ impactedQueues.clear(); }
-	void addXidRecord(qpid::broker::ExternalQueueStore* queue){
-		impactedQueues.insert(queue); }
-
+    virtual const std::string& getXid() { return tid; }
+    
+    void deleteXidRecord(){ impactedQueues.clear(); }
+    void addXidRecord(qpid::broker::ExternalQueueStore* queue) {
+        impactedQueues.insert(queue); }
+    
 };
 
 class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
@@ -146,12 +155,15 @@
     virtual bool isTPC() { return true; }
     virtual const std::string& getXid() { return xid; }
 	// commit the BDB abort, abort commit the jnrl
-    void commit(){ txn->commit(0); txn = 0; }
-    void abort(){ txn->abort(); txn = 0; }
+    void commit(){ txn->commit(0); txn = 0; globalHolder.reset(); }
+    void abort(){ txn->abort(); txn = 0; globalHolder.reset(); }
     void complete(bool commit){ 
-	txn->commit(0); completeTXN(commit); txn = 0; } 
+	txn->commit(0); completeTXN(commit); txn = 0; 
+    } 
 };
 
+qpid::sys::Mutex TxnCtxt::globalSerialiser;
+
 }}
 
 #endif




More information about the rhmessaging-commits mailing list