[rhmessaging-commits] rhmessaging commits: r2199 - in store/branches/mrg-1.0/cpp: tests and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jul 14 13:32:13 EDT 2008


Author: kpvdr
Date: 2008-07-14 13:32:13 -0400 (Mon, 14 Jul 2008)
New Revision: 2199

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
   store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
Log:
Backport of trunk r.2189: Fixed 1PC multi-queue atomic commits/aborts. Also fixed unnecessary prepared list writes that occur in local transactions that contain no durable messages.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-07-14 17:32:13 UTC (rev 2199)
@@ -1218,7 +1218,7 @@
     }
 }
 
-void BdbMessageStore::completed(TPCTxnCtxt& txn,
+void BdbMessageStore::completed(TxnCtxt& txn,
                                 bool commit)
 {
     try {
@@ -1256,21 +1256,25 @@
 void BdbMessageStore::prepare(qpid::broker::TPCTransactionContext& ctxt)
 {
     checkInit();
-    TPCTxnCtxt* txn = dynamic_cast<TPCTxnCtxt*>(&ctxt);
+    TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
     if(!txn) throw InvalidTransactionContextException();
+    localPrepare(txn);
+}
 
+void BdbMessageStore::localPrepare(TxnCtxt* ctxt)
+{
     try {
         chkInitPreparedXidStore();
-        txn->incrDtokRef();
-        DataTokenImpl* dtokp = txn->getDtok();
+        ctxt->incrDtokRef();
+        DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
-        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, txn->getXid(), false);
-        txn->prepare(preparedXidStorePtr.get());
+        preparedXidStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+        ctxt->prepare(preparedXidStorePtr.get());
         // make sure all the data is written to disk before returning
-        txn->sync();
+        ctxt->sync();
     } catch (const std::exception& e) {
-        QPID_LOG(error, "Error preparing xid " << txn->getXid() << ": " << e.what());
+        QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
         throw;
     }
 }
@@ -1279,22 +1283,22 @@
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
-    if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), true);
-    } else {
-        txn->complete(true);
+    if (!txn->isTPC()) {
+        if (txn->impactedQueuesEmpty()) return;
+        localPrepare(dynamic_cast<TxnCtxt*>(txn));
     }
+    completed(*dynamic_cast<TxnCtxt*>(txn), true);
 }
 
 void BdbMessageStore::abort(TransactionContext& ctxt)
 {
     checkInit();
     TxnCtxt* txn(check(&ctxt));
-    if (txn->isTPC()) {
-        completed(*dynamic_cast<TPCTxnCtxt*>(txn), false);
-    } else {
-        txn->complete(false);
+    if (!txn->isTPC()) {
+        if (txn->impactedQueuesEmpty()) return;
+        localPrepare(dynamic_cast<TxnCtxt*>(txn));
     }
+    completed(*dynamic_cast<TxnCtxt*>(txn), false);
 }
 
 TxnCtxt* BdbMessageStore::check(TransactionContext* ctxt)

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-07-14 17:32:13 UTC (rev 2199)
@@ -169,7 +169,7 @@
             bool create(Db& db,
                         IdSequence& seq,
                         const qpid::broker::Persistable& p);
-            void completed(TPCTxnCtxt& txn,
+            void completed(TxnCtxt& txn,
                            bool commit);
             void record2pcOp(Db& db,
                              TPCTxnCtxt& txn,
@@ -306,9 +306,15 @@
             void collectPreparedXids(std::set<std::string>& xids);
 
             std::auto_ptr<qpid::broker::TransactionContext> begin();
+
             std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
+
             void prepare(qpid::broker::TPCTransactionContext& ctxt);
+
+            void localPrepare(TxnCtxt* ctxt);
+
             void commit(qpid::broker::TransactionContext& ctxt);
+
             void abort(qpid::broker::TransactionContext& ctxt);
 
             qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const

Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-07-14 17:32:13 UTC (rev 2199)
@@ -59,6 +59,7 @@
     IdSequence* loggedtx;
     boost::intrusive_ptr<DataTokenImpl> dtokp;
     AutoScopedLock globalHolder;
+    JournalImpl* preparedXidStorePtr;
 
     /**
      * local txn id, if non XA.
@@ -72,6 +73,8 @@
             commitTxn(static_cast<JournalImpl*>(*i), commit);
         }
         impactedQueues.clear();
+        if (preparedXidStorePtr)
+            commitTxn(preparedXidStorePtr, commit);
     }
 
     void commitTxn(JournalImpl* jc, bool commit) {
@@ -95,7 +98,7 @@
 
   public:
 
-    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), txn(0) {
+    TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
         if (loggedtx) {
             std::stringstream s;
             s << "rhm-tid" << this;
@@ -125,6 +128,8 @@
             for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
                 sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
             }
+            if (preparedXidStorePtr)
+                sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
             firstloop = false;
         }
     }
@@ -167,7 +172,9 @@
     virtual const std::string& getXid() { return tid; }
     
     void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+    inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
     void complete(bool commit) { completeTxn(commit); }
+    bool impactedQueuesEmpty() { return impactedQueues.empty(); }
     DataTokenImpl* getDtok() { return dtokp.get(); }
     void incrDtokRef() { dtokp->addRef(); }
     void recoverDtok(const u_int64_t rid, const std::string xid) {
@@ -182,25 +189,9 @@
 {
   protected:
     const std::string xid;
-    JournalImpl* preparedXidStorePtr;
-    virtual void completeTxn(bool commit) {
-        TxnCtxt::completeTxn(commit);
-        if (preparedXidStorePtr) commitTxn(preparedXidStorePtr, commit);
-    }
 
   public:
-    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid), preparedXidStorePtr(0) {}
-    void sync() {
-        TxnCtxt::sync();
-        bool allWritten = false;
-        if (preparedXidStorePtr) {
-            while (!allWritten) {
-                allWritten = true;
-                sync_jrnl(preparedXidStorePtr, true, allWritten);
-            }
-        }
-    }
-    inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
+    TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
     inline virtual bool isTPC() { return true; }
     inline virtual const std::string& getXid() { return xid; }
 };

Modified: store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp	2008-07-14 17:05:31 UTC (rev 2198)
+++ store/branches/mrg-1.0/cpp/tests/TransactionalTest.cpp	2008-07-14 17:32:13 UTC (rev 2199)
@@ -33,6 +33,7 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
 
 using namespace rhm::bdbstore;
 using namespace qpid::broker;
@@ -55,6 +56,7 @@
         // to end to simulate multi-queue txn complete failure.
         while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
     }
+    void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
 };
 
 // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
@@ -68,6 +70,25 @@
         // pass sequence number for c/a
         return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
     }
+    void commit(TransactionContext& ctxt, const bool complete_prepared_list) {
+        checkInit();
+        TxnCtxt* txn(check(&ctxt));
+        if (!txn->isTPC()) {
+            localPrepare(dynamic_cast<TxnCtxt*>(txn));
+            if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+        }
+        completed(*dynamic_cast<TxnCtxt*>(txn), true);
+    }
+    void abort(TransactionContext& ctxt, const bool complete_prepared_list)
+    {
+        checkInit();
+        TxnCtxt* txn(check(&ctxt));
+        if (!txn->isTPC()) {
+            localPrepare(dynamic_cast<TxnCtxt*>(txn));
+            if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+        }
+        completed(*dynamic_cast<TxnCtxt*>(txn), false);
+    }
 };
 
 // === Helper fns ===
@@ -178,10 +199,11 @@
     checkMsg(y, 0);
 }
 
-void testMultiQueueTxn(const unsigned num_queues_rem, const bool commit)
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
 {
     setup<TestMessageStore>();
-    std::auto_ptr<TransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin());
+    TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+    std::auto_ptr<TransactionContext> txn(tmsp->begin());
 
     //create two messages and enqueue them onto both queues:
     boost::intrusive_ptr<Message> msgA = createMessage("MessageA", "exchange", "routing_key");
@@ -193,9 +215,9 @@
 
     static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
     if (commit)
-        store->commit(*txn);
+        tmsp->commit(*txn, complete_prepared_list);
     else
-        store->abort(*txn);
+        tmsp->abort(*txn, complete_prepared_list);
     restart<TestMessageStore>();
 
     // Check outcome
@@ -225,47 +247,61 @@
     swap(false);
     cout << "ok" << endl;
 }
-/*
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+    cout << test_filename << ".MultiQueueCommit: " << flush;
+    testMultiQueueTxn(2, true, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+    cout << test_filename << ".MultiQueueAbort: " << flush;
+    testMultiQueueTxn(2, true, true);
+    cout << "ok" << endl;
+}
+
 QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
 {
     cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
-    testMultiQueueTxn(0, true);
+    testMultiQueueTxn(0, false, true);
     cout << "ok" << endl;
 }
 
 QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
 {
     cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
-    testMultiQueueTxn(0, false);
+    testMultiQueueTxn(0, false, false);
     cout << "ok" << endl;
 }
 
 QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
 {
     cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
-    testMultiQueueTxn(1, true);
+    testMultiQueueTxn(1, false, true);
     cout << "ok" << endl;
 }
 
 QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
 {
     cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
-    testMultiQueueTxn(1, false);
+    testMultiQueueTxn(1, false, false);
     cout << "ok" << endl;
 }
 
 QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
 {
     cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
-    testMultiQueueTxn(2, true);
+    testMultiQueueTxn(2, false, true);
     cout << "ok" << endl;
 }
 
 QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
 {
     cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
-    testMultiQueueTxn(2, false);
+    testMultiQueueTxn(2, false, false);
     cout << "ok" << endl;
 }
-*/
+
 QPID_AUTO_TEST_SUITE_END()




More information about the rhmessaging-commits mailing list