[rhmessaging-commits] rhmessaging commits: r2298 - in store/trunk/cpp: lib/jrnl and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Aug 13 16:27:31 EDT 2008


Author: kpvdr
Date: 2008-08-13 16:27:30 -0400 (Wed, 13 Aug 2008)
New Revision: 2298

Modified:
   store/trunk/cpp/lib/BdbMessageStore.cpp
   store/trunk/cpp/lib/BdbMessageStore.h
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/PreparedTransaction.h
   store/trunk/cpp/lib/TxnCtxt.h
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/enq_map.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jerrno.cpp
   store/trunk/cpp/lib/jrnl/jerrno.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/tests/jrnl/_st_basic.cpp
   store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp
   store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
   store/trunk/cpp/tests/persistence.py
Log:
Backport from store-1.0 branch: Fix for BZ458053: r.2279 "txtest failures when broker killed during transfer phase". Modified message recovery to correctly predict outcome of to-be-rolled-forward/back transactions. Access to jcntl::_emap was required for this, so some accessers were added to class jcntl. Includes fix to python file check program jfile_chk.py which incorrectly detected owi on last-to-first file transition and message content overflowed. r.2281 (minor), r.2297 Additional fixes for BZ458053 "txtest failures when broker killed during transfer phase".

Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -52,10 +52,12 @@
 
 BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
                                                               const bool _deq_flag,
-                                                              const bool _commit_flag) :
+                                                              const bool _commit_flag,
+                                                              const bool _tpc_flag) :
                                                               rid(_rid),
                                                               deq_flag(_deq_flag),
-                                                              commit_flag(_commit_flag)
+                                                              commit_flag(_commit_flag),
+                                                              tpc_flag(_tpc_flag)
 {}
 
 BdbMessageStore::BdbMessageStore(const char* envpath) :
@@ -361,7 +363,7 @@
         // TODO: Is this mutex necessary?
         qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
         jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
-                                 string("JournalData"), defJournalGetEventsTimeout,
+                                 std::string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout);
     }
 
@@ -535,49 +537,69 @@
 
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-        TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
-        std::auto_ptr<TPCTransactionContext> txn(tpcc);
-
-        tpcc->prepare(tplStorePtr.get());
         if (mgmtObject != 0) {
             mgmtObject->inc_tplTransactionDepth();
             mgmtObject->inc_tplTxnPrepares();
-        } 
+        }
 
+        std::string xid = i->xid;
+
         // Restore data token state in TxnCtxt
-        TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+        TplRecoverMapCitr citr = tplRecoverMap.find(xid);
         if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-        tpcc->recoverDtok(citr->second.rid, i->xid);
 
         // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
-        bool incomplTxnFlag = citr->second.deq_flag;
+        bool incomplTplTxnFlag = citr->second.deq_flag;
 
-        RecoverableTransaction::shared_ptr dtx;
-        if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
-        if (i->enqueues.get()) {
-            for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
-                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+        if (citr->second.tpc_flag) {
+            // Dtx (2PC) transaction
+            TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
+            std::auto_ptr<TPCTransactionContext> txn(tpcc);
+            tpcc->recoverDtok(citr->second.rid, xid);
+            tpcc->prepare(tplStorePtr.get());
+
+            RecoverableTransaction::shared_ptr dtx;
+            if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+            if (i->enqueues.get()) {
+                for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                    if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+                }
             }
-        }
-        if (i->dequeues.get()) {
-            for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
-                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+            if (i->dequeues.get()) {
+                for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                    if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+                }
             }
-        }
 
-        if (incomplTxnFlag) {
-            tpcc->complete(citr->second.commit_flag);
+            if (incomplTplTxnFlag) {
+                tpcc->complete(citr->second.commit_flag);
+            }
         } else {
-            if (mgmtObject != 0) {
-                mgmtObject->inc_tplTransactionDepth();
-                mgmtObject->inc_tplTxnPrepares();
-            } 
+            // Local (1PC) transaction
+            boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
+            opcc->recoverDtok(citr->second.rid, xid);
+            opcc->prepare(tplStorePtr.get());
+
+            if (i->enqueues.get()) {
+                for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                }
+            }
+            if (i->dequeues.get()) {
+                for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                }
+            }
+            if (incomplTplTxnFlag) {
+                opcc->complete(citr->second.commit_flag);
+            } else {
+                completed(*opcc.get(), citr->second.commit_flag);
+            }
         }
     }
-
     registry.recoveryComplete();
 }
 
@@ -606,7 +628,7 @@
         JournalImpl* jQueue = 0;
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 
@@ -675,8 +697,8 @@
             THROW_STORE_EXCEPTION("Not enough data for binding");
         }
         uint64_t queueId = buffer.getLongLong();
-        string queueName;
-        string routingkey;
+        std::string queueName;
+        std::string routingkey;
         FieldTable args;
         buffer.getShortString(queueName);
         buffer.getShortString(routingkey);
@@ -774,15 +796,39 @@
                 }
 
                 PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
-                if (i == prepared.end()) { // not locked
+                if (i == prepared.end()) { // not in prepared list
                     queue->recover(msg);
                 } else {
-                    TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+                    u_int64_t rid = dtokp.rid();
+                    std::string xid(i->xid);
+                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                     if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-                    if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
-                        if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
+
+                    // deq present in prepared list, this xid is part of incomplete txn commit/abort
+                    // or this is a 1PC txn that must be rolled forward
+                    if (citr->second.deq_flag || !citr->second.tpc_flag) {
+                        if (jc->is_enqueued(rid, true)) {
+                            // Enqueue is non-tx, dequeue tx
+                            assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
+                            if (!citr->second.commit_flag) {
+                                queue->recover(msg); // recover message in abort case only
+                            }
+                        } else {
+                            // Enqueue and/or dequeue tx
+                            journal::txn_map& tmap = jc->get_txn_map();
+                            journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                            bool enq = false;
+                            bool deq = false;
+                            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                                if (j->_enq_flag && j->_rid == rid) enq = true;
+                                else if (!j->_enq_flag && j->_drid == rid) deq = true;
+                            }
+                            if (enq && !deq && citr->second.commit_flag) {
+                                queue->recover(msg); // recover txn message in commit case only
+                            }
+                        }
                     } else {
-                        messages[dtokp.rid()] = msg;
+                        messages[rid] = msg;
                     }
                 }
 
@@ -845,7 +891,7 @@
         txn.commit();
     } catch (const DbException& e) {
         txn.abort();
-        THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+        THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + std::string(e.what()));
     } catch (...) {
         txn.abort();
         throw;
@@ -884,54 +930,85 @@
     return count;
 }
 
-void BdbMessageStore::recoverTplStore()
+void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
 {
     if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
         tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
-        tplStorePtr->recover_complete(); // start journal.
-    }
-}
 
-void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
-{
-    if (tplStorePtr.get()) {
-        if (!tplStorePtr->is_ready())
-            recoverTplStore();
+        DataTokenImpl dtokp;
+        void* dbuff = NULL; size_t dbuffSize = 0;
+        void* xidbuff = NULL; size_t xidbuffSize = 0;
+        bool transientFlag = false;
+        bool externalFlag = false;
+        bool done = false;
+        try {
+            unsigned aio_sleep_cnt = 0;
+            while (!done) {
+                dtokp.reset();
+                dtokp.set_wstate(DataTokenImpl::ENQ);
+                switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+                  case rhm::journal::RHM_IORES_SUCCESS: {
+                    // Every TPL record contains both data and an XID
+                    assert(dbuffSize>0);
+                    assert(xidbuffSize>0);
+                    std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+                    bool is2PC = *(static_cast<char*>(dbuff)) != 0;
 
-        // TODO: The journal will return a const txn_map and the txn_map will support
-        // const operations at some point. Using non-const txn_map this way is ugly...
-        journal::txn_map& tmap = tplStorePtr->get_txn_map();
-        std::vector<std::string> xidList;
-        tmap.xid_list(xidList);
-        for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
-            journal::txn_data_list txnList = tmap.get_tdata_list(*i);
-            unsigned enqCnt = 0;
-            unsigned deqCnt = 0;
-            u_int64_t rid = 0;
-            bool commitFlag = false;
-            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                if (j->_enq_flag) {
-                    rid = j->_rid;
-                    enqCnt++;
-                } else {
-                    commitFlag = j->_commit_flag;
-                    deqCnt++;
-                }
+                    // Check transaction details; add to recover map
+                    journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                    unsigned enqCnt = 0;
+                    unsigned deqCnt = 0;
+                    u_int64_t rid = 0;
+
+                    // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+                    // Note: will apply to both 1PC and 2PC transactions.
+                    bool commitFlag = true;
+
+                    for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                        if (j->_enq_flag) {
+                            rid = j->_rid;
+                            enqCnt++;
+                        } else {
+                            commitFlag = j->_commit_flag;
+                            deqCnt++;
+                        }
+                    }
+                    assert(enqCnt == 1);
+                    assert(deqCnt <= 1);
+                    tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+                    ::free(xidbuff);
+                    aio_sleep_cnt = 0;
+                    break;
+                    }
+                  case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                        THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+                    ::usleep(AIO_SLEEP_TIME);
+                    break;
+                  case rhm::journal::RHM_IORES_EMPTY:
+                    done = true;
+                    break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+                  default:
+                    assert("Store Error: Unexpected msg state");
+                } // switch
             }
-            assert(enqCnt == 1);
-            assert(deqCnt <= 1);
-            tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
         }
+
+        tplStorePtr->recover_complete(); // start journal.
     }
 }
 
 void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
     if (!tplStorePtr->is_ready())
-        getTplRecoverMap(tplRecoverMap);
+        recoverTplStore(tplRecoverMap);
 
     // Abort unprepaired xids and populate the locked maps
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -943,12 +1020,13 @@
     }
 }
 
-void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
+void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
 {
     if (!tplStorePtr->is_ready())
-        getTplRecoverMap(tplRecoverMap);
+        recoverTplStore(tplRecoverMap);
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
-        if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+        // Discard all txns that are to be rolled forward/back and 1PC transactions
+        if (!i->second.deq_flag && i->second.tpc_flag)
             xids.insert(i->first);
     }
 }
@@ -1021,7 +1099,7 @@
     try {
         int status = db.get(txn, &key, &peek, 0);
         if (status != DB_BUFFER_SMALL) {
-            THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
+            THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
         }
     } catch (const DbMemoryException& expected) {
         //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
@@ -1257,7 +1335,7 @@
     ddtokp->set_rid(messageIdSequence.next());
     ddtokp->set_dequeue_rid(msg->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
-    string tid;
+    std::string tid;
     if (ctxt) {
         TxnCtxt* txn = check(ctxt);
         tid = txn->getXid();
@@ -1369,11 +1447,16 @@
     try {
         chkTplStoreInit(); // Late initialize (if needed)
 
+        // This sync is requred to ensure multi-queue atomicity - ie all txn data
+        // must hit the disk on *all* queues before the TPL prepare (enq) is written.
+        ctxt->sync();
+
         ctxt->incrDtokRef();
         DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
-        tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+        char tpcFlag = static_cast<char>(ctxt->isTPC());
+        tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
         ctxt->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt->sync();
@@ -1447,7 +1530,7 @@
     } else if (status == DB_NOTFOUND) {
         return false;
     } else {
-        THROW_STORE_EXCEPTION("Deletion failed: " + string(DbEnv::strerror(status)));
+        THROW_STORE_EXCEPTION("Deletion failed: " + std::string(DbEnv::strerror(status)));
     }
 }
 
@@ -1527,33 +1610,33 @@
     }
 }
 
-string BdbMessageStore::getJrnlBaseDir()
+std::string BdbMessageStore::getJrnlBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/jrnl/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getBdbBaseDir()
+std::string BdbMessageStore::getBdbBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/dat/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getTplBaseDir()
+std::string BdbMessageStore::getTplBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/tpl/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
 {
     return getJrnlDir(queue.getName().c_str());
 }
 
-string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
 {
     std::stringstream dir;
     dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);

Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/BdbMessageStore.h	2008-08-13 20:27:30 UTC (rev 2298)
@@ -63,10 +63,11 @@
 
     // Structs for Transaction Recover List (TPL) recover state
     struct TplRecoverStruct {
-        u_int64_t rid;
+        u_int64_t rid; // rid of TPL record
         bool deq_flag;
         bool commit_flag;
-        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+        bool tpc_flag;
+        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
     };
     typedef TplRecoverStruct TplRecover;
     typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -157,8 +158,7 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverTplStore();
-    void getTplRecoverMap(TplRecoverMap& tplMap);
+    void recoverTplStore(TplRecoverMap& tplMap);
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -214,7 +214,7 @@
         }
     }
     std::ostringstream oss2;
-    oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
+    oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
     oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
     oss2 << "; journal now read-only.";
     log(LOG_DEBUG, oss2.str());
@@ -232,7 +232,7 @@
 JournalImpl::recover_complete()
 {
     jcntl::recover_complete();
-    log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+    log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
 }
 
 #define MAX_AIO_SLEEPS 1000  // 10 sec 

Modified: store/trunk/cpp/lib/PreparedTransaction.h
===================================================================
--- store/trunk/cpp/lib/PreparedTransaction.h	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/PreparedTransaction.h	2008-08-13 20:27:30 UTC (rev 2298)
@@ -47,6 +47,7 @@
 
     void add(queue_id queue, message_id message);
     bool isLocked(queue_id queue, message_id message);
+    std::size_t size() { return locked.size(); }
     iterator begin() { return locked.begin(); }
     iterator end() { return locked.end(); }
 

Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/TxnCtxt.h	2008-08-13 20:27:30 UTC (rev 2298)
@@ -86,7 +86,7 @@
             try {
                 if (commit) {
                     jc->txn_commit(dtokp.get(), getXid());
-                    if (isTPC()) sync();
+                    sync();
                 } else {
                     jc->txn_abort(dtokp.get(), getXid());
                 }
@@ -106,6 +106,8 @@
         }
     }
 
+    TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
     /**
      * Call to make sure all the data for this txn is written to safe store
      *

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -130,16 +130,13 @@
 }
 
 bool
-enq_map::is_enqueued(const u_int64_t rid)
+enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
 {
-    emap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(rid);
-    }
+    slock s(&_mutex);
+    emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
         return false;
-    if (itr->second.second) // locked
+    if (!ignore_lock && itr->second.second) // locked
         return false;
     return true;
 }

Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -95,7 +95,7 @@
         void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
         u_int16_t get_fid(const u_int64_t rid);
         u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
-        bool is_enqueued(const u_int64_t rid);
+        bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
         void lock(const u_int64_t rid);
         void unlock(const u_int64_t rid);
         bool is_locked(const u_int64_t rid);

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -634,8 +634,20 @@
             for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
             {
                 std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
-                if (pitr == prep_txn_list_ptr->end())
-                    _tmap.get_remove_tdata_list(*itr);
+                if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
+                {
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+                    // Unlock any affected enqueues in emap
+                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
+                    {
+                        if (i->_enq_flag) // enq op - decrement enqueue count
+                            rd._enq_cnt_list[i->_fid]--;
+                        else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
+                            _emap.unlock(i->_drid);
+                    }
+                    // Write abort record to disk
+                    rcvr_write_abort(rd, *itr);
+                }
             }
         }
     }
@@ -951,7 +963,7 @@
             ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
             assert(!ofsp.fail());
             std::ostringstream oss;
-            oss << "Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+            oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
             this->log(LOG_NOTICE, oss.str());
             file_pos = ofsp.tellp();
         }
@@ -961,5 +973,99 @@
     }
 }
 
+// TODO - FIXME - Unify the recover and normal aio write methods.
+// Normally, writes are not performed during recover mode (journal is in read-only
+// mode) so initialization of the aio write controllers is deferred until recover
+// is complete. Currenlty because journal is still in recover mode when
+// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
+// writes are used instead. Lots of logic duplication!
+void
+jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
+{
+    const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+    const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
+    const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+
+    // Check last record ends on sblk boundary
+    assert(rd._eo % sblk_size == 0);
+    
+    // Find fid and posn to write
+    u_int16_t fid = rd._lfid;
+    std::streampos file_pos = rd._eo;
+    if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
+    {
+        if (++fid >= _num_jfiles)
+        {
+            fid = 0;
+            rd._owi = !rd._owi;
+        }
+        file_pos = 0;
+    }
+
+    // Prepare a buffer of at least 1 sblock
+    u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
+    std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
+    void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
+    assert(buff != 0);
+
+    // Initialize file stream
+    std::ostringstream fn;
+    fn << _jdir.dirname() << "/" << _base_filename << ".";
+    fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+    std::ofstream ofs(fn.str().c_str(),
+            std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+    if (!ofs.good())
+        throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
+    if (file_pos)
+        ofs.seekp(file_pos);
+    else
+    {
+        // New file, write new file header
+        file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
+        ofs.write((const char*)&fh, sizeof(file_hdr));
+        assert(!ofs.fail());
+        // fill remainder of sblk with fill char
+        std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
+        ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
+        assert(!ofs.fail());
+        // log write action
+        std::ostringstream oss;
+        oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
+        this->log(LOG_NOTICE, oss.str());
+        file_pos = ofs.tellp();
+    }
+    
+    // Write abort record
+    txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
+    u_int32_t res = ar.encode(buff, 0, abort_dblks);
+    assert(res == abort_dblks);
+    ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+    assert(!ofs.fail());
+    // log write action
+    std::ostringstream oss;
+    oss << "Recover phase write: Aborted unprepared transaction xid=" << xid << " at offs=0x" << std::hex << file_pos << std::dec;
+    this->log(LOG_NOTICE, oss.str());
+    file_pos = ofs.tellp();
+
+    // Prepare filler record
+    std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+    std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+    // Write filler as many times as needed to get to next sblk boundary
+    while (file_pos % sblk_size)
+    {
+        ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+        assert(!ofs.fail());
+        std::ostringstream oss;
+        oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+        this->log(LOG_NOTICE, oss.str());
+        file_pos = ofs.tellp();
+    }
+    rd._eo = file_pos;
+
+    // Clean up
+    ofs.close();
+    std::free(buff);
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -557,7 +557,15 @@
         *     false if the rid is transactionally enqueued and is not committed, or if it is
         *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
         */
-        inline bool is_enqueued(const u_int64_t rid) { return _emap.is_enqueued(rid); }
+        inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+                { return _emap.is_enqueued(rid, ignore_lock); }
+        inline bool is_locked(const u_int64_t rid)
+                { if (_emap.is_enqueued(rid, true)) return _emap.is_locked(rid); return false; }
+        inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+        inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+        inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
+        // TODO Make this a const, but txn_map must support const first.
+        inline txn_map& get_txn_map() { return _tmap; }
         
         /**
         * \brief Check if the journal is stopped.
@@ -606,10 +614,6 @@
 
         inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
         
-        inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
-        // TODO Make this a const, but txn_map must support const first.
-        inline txn_map& get_txn_map() { return _tmap; }
-
         // Logging
         virtual void log(log_level level, const std::string& log_stmt) const;
         virtual void log(log_level level, const char* const log_stmt) const;
@@ -668,6 +672,8 @@
                 std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
+
+        void rcvr_write_abort(rcvdat& rd, std::string xid);
     };
 
 } // namespace journal

Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -94,6 +94,7 @@
 const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE  = 0x0802;
 const u_int32_t jerrno::JERR_WMGR_ENQDISCONT    = 0x0803;
 const u_int32_t jerrno::JERR_WMGR_DEQDISCONT    = 0x0804;
+const u_int32_t jerrno::JERR_WMGR_DEQRIDNOTENQ  = 0x0805;
 
 // class rmgr
 const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC  = 0x0900;
@@ -187,6 +188,7 @@
             "Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
     _err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: "
             "Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
+    _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
 
     // class rmgr
     _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";

Modified: store/trunk/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/jerrno.hpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -112,6 +112,7 @@
         static const u_int32_t JERR_WMGR_BADDTOKSTATE;  ///< Data token in illegal state.
         static const u_int32_t JERR_WMGR_ENQDISCONT;    ///< Enq. new dtok when previous part compl.
         static const u_int32_t JERR_WMGR_DEQDISCONT;    ///< Deq. new dtok when previous part compl.
+        static const u_int32_t JERR_WMGR_DEQRIDNOTENQ;  ///< Deq. rid not enqueued
 
         // class rmgr
         static const u_int32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -1000,13 +1000,34 @@
 wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
 {
     // First check emap
-    try { _emap.get_fid(drid); }
+    bool found = false;
+    try
+    {
+        _emap.get_fid(drid);
+        found = true;
+    }
     catch(const jexception& e)
     {
         if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
             throw;
-        _tmap.get_data(xid, drid); // not in emap, try tmap
+        if (xid.size())
+            try
+            {
+                _tmap.get_data(xid, drid); // not in emap, try tmap
+                found = true;
+            }
+            catch (const jexception& e)
+            {
+                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                    throw;
+            }
     }
+    if (!found)
+    {
+        std::ostringstream oss;
+        oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+        throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+    }
 }
 
 void

Modified: store/trunk/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/_st_basic.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -408,7 +408,7 @@
         u_int64_t rid = enq_msg(jc, 0, create_msg(msg, 0, MSG_SIZE), false);
         deq_msg(jc, rid);
         try{ deq_msg(jc, rid); BOOST_ERROR("Did not throw exception on second dequeue."); }
-        catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+        catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_WMGR_DEQRIDNOTENQ); }
         rid = enq_msg(jc, 1, create_msg(msg, 1, MSG_SIZE), false);
         deq_msg(jc, rid);
     }

Modified: store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/_st_basic_txn.cpp	2008-08-13 20:27:30 UTC (rev 2298)
@@ -87,9 +87,9 @@
             try
             {
                 deq_msg(jc, m);
-                BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+                BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
             }
-            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
         }
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
@@ -138,9 +138,9 @@
             try
             {
                 deq_msg(jc, 3*m);
-                BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+                BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
             }
-            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
         }
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }

Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-13 20:27:30 UTC (rev 2298)
@@ -445,6 +445,7 @@
         self.first_rec = False
         self.last_file = False
         self.last_rid = -1
+        self.fhdr_owi_at_msg_start = None
 
         self.proc_args(argv)
         self.proc_csv()
@@ -474,6 +475,7 @@
                 stop = True;
             else:
                 self.rec_cnt += 1
+                self.fhdr_owi_at_msg_start = self.fhdr.owi()
                 if self.first_rec:
                     if self.fhdr.fro != hdr.foffs:
                         raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -577,7 +579,8 @@
                                 for rec in self.tmap[hdr.xid]:
                                     if isinstance(rec[1], DeqHdr):
                                         if self.emap[rec[1].deq_rid] != None:
-                                            self.emap[rec[1].deq_rid][2] = False # Unlock enq record
+                                            t = self.emap[rec[1].deq_rid]
+                                            self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
                             del self.tmap[hdr.xid]
                             if len(mismatched_rids) > 0:
                                 warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
@@ -656,7 +659,7 @@
         return self.file_num
 
     def check_owi(self, hdr):
-        return self.fhdr.owi() == hdr.owi()
+        return self.fhdr_owi_at_msg_start == hdr.owi()
 
     def check_rid(self, hdr):
         if  self.last_rid != -1 and hdr.rid <= self.last_rid:

Modified: store/trunk/cpp/tests/persistence.py
===================================================================
--- store/trunk/cpp/tests/persistence.py	2008-08-13 19:24:20 UTC (rev 2297)
+++ store/trunk/cpp/tests/persistence.py	2008-08-13 20:27:30 UTC (rev 2298)
@@ -231,7 +231,7 @@
         if txc.global_id not in ids:    
             self.fail("Recovered xids not as expected. missing: %s" % (txc))
         if txd.global_id not in ids:    
-            self.fail("Recovered xids not as expected. missing: %s" % (txc))
+            self.fail("Recovered xids not as expected. missing: %s" % (txd))
         self.assertEqual(2, len(xids))    
 
 




More information about the rhmessaging-commits mailing list