[rhmessaging-commits] rhmessaging commits: r2279 - in store/branches/mrg-1.0/cpp: lib/jrnl and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Aug 12 11:27:18 EDT 2008


Author: kpvdr
Date: 2008-08-12 11:27:18 -0400 (Tue, 12 Aug 2008)
New Revision: 2279

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
   store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
   store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
   store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
   store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
   store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
   store/branches/mrg-1.0/cpp/tests/persistence.py
Log:
Fix for BZ458053: "txtest failures when broker killed during transfer phase". Modified message recovery to correctly predict outcome of to-be-rolled-forward/back transactions. Access to jcntl::_emap was required for this, so some accessers were added to class jcntl. Includes fix to python file check program jfile_chk.py which incorrectly detected owi on last-to-first file transition and message content overflowed.

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -52,10 +52,12 @@
 
 BdbMessageStore::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
                                                               const bool _deq_flag,
-                                                              const bool _commit_flag) :
+                                                              const bool _commit_flag,
+                                                              const bool _tpc_flag) :
                                                               rid(_rid),
                                                               deq_flag(_deq_flag),
-                                                              commit_flag(_commit_flag)
+                                                              commit_flag(_commit_flag),
+                                                              tpc_flag(_tpc_flag)
 {}
 
 BdbMessageStore::BdbMessageStore(const char* envpath) :
@@ -347,7 +349,7 @@
     {
         qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
         jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
-                                 string("JournalData"), defJournalGetEventsTimeout,
+                                 std::string("JournalData"), defJournalGetEventsTimeout,
                                  defJournalFlushTimeout);
     }
 
@@ -521,36 +523,61 @@
 
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-        TPCTxnCtxt* tpcc = new TPCTxnCtxt(i->xid, &messageIdSequence);
-        std::auto_ptr<TPCTransactionContext> txn(tpcc);
+        std::string xid = i->xid;
 
-        tpcc->prepare(tplStorePtr.get());
-
         // Restore data token state in TxnCtxt
-        TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+        TplRecoverMapCitr citr = tplRecoverMap.find(xid);
         if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-        tpcc->recoverDtok(citr->second.rid, i->xid);
 
         // If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
         // was interrupted part way through committing/aborting the impacted queues. Complete this process.
-        bool incomplTxnFlag = citr->second.deq_flag;
+        bool incomplTplTxnFlag = citr->second.deq_flag;
 
-        RecoverableTransaction::shared_ptr dtx;
-        if (!incomplTxnFlag) dtx = registry.recoverTransaction(i->xid, txn);
-        if (i->enqueues.get()) {
-            for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
-                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                if (!incomplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+        if (citr->second.tpc_flag) {
+            // Dtx (2PC) transaction
+            TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
+            std::auto_ptr<TPCTransactionContext> txn(tpcc);
+            tpcc->recoverDtok(citr->second.rid, xid);
+            tpcc->prepare(tplStorePtr.get());
+
+            RecoverableTransaction::shared_ptr dtx;
+            if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
+            if (i->enqueues.get()) {
+                for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                    if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
+                }
             }
-        }
-        if (i->dequeues.get()) {
-            for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
-                tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
-                if (!incomplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+            if (i->dequeues.get()) {
+                for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+                    tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                    if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
+                }
             }
+
+            if (incomplTplTxnFlag) {
+                tpcc->complete(citr->second.commit_flag);
+            }
+        } else {
+            // Local (1PC) transaction
+            boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
+            opcc->recoverDtok(citr->second.rid, xid);
+            opcc->prepare(tplStorePtr.get());
+
+            if (i->enqueues.get()) {
+                for (LockedMappings::iterator j = i->enqueues->begin(); j != i->enqueues->end(); j++) {
+                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                }
+            }
+            if (i->dequeues.get()) {
+                for (LockedMappings::iterator j = i->dequeues->begin(); j != i->dequeues->end(); j++) {
+                    opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
+                }
+            }
+            if (incomplTplTxnFlag) {
+                opcc->complete(citr->second.commit_flag);
+            }
         }
-
-        if (incomplTxnFlag) tpcc->complete(citr->second.commit_flag);
     }
     registry.recoveryComplete();
 }
@@ -580,7 +607,7 @@
         JournalImpl* jQueue = 0;
         {
             qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
-            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
+            jQueue = new JournalImpl(queueName, getJrnlDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout);
         }
         queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
 
@@ -649,8 +676,8 @@
             THROW_STORE_EXCEPTION("Not enough data for binding");
         }
         uint64_t queueId = buffer.getLongLong();
-        string queueName;
-        string routingkey;
+        std::string queueName;
+        std::string routingkey;
         FieldTable args;
         buffer.getShortString(queueName);
         buffer.getShortString(routingkey);
@@ -695,6 +722,7 @@
                                       txn_list& prepared,
                                       message_index& messages)
 {
+//std::cout << "***** recoverMessages(): queue=" << queue->getName() << std::endl;
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -748,15 +776,36 @@
                 }
 
                 PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtokp.rid());
-                if (i == prepared.end()) { // not locked
+                if (i == prepared.end()) { // not in prepared list
                     queue->recover(msg);
                 } else {
-                    TplRecoverMapCitr citr = tplRecoverMap.find(i->xid);
+                    u_int64_t rid = dtokp.rid();
+                    std::string xid(i->xid);
+                    TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                     if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
                     if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
-                        if (citr->second.commit_flag) queue->recover(msg); // treat as non-tx, roll forward (else aborted, throw away)
+                        if (jc->is_enqueued(rid, true)) {
+                            // Enqueue is non-tx, dequeue tx
+                            assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
+                            if (!citr->second.commit_flag) {
+                                queue->recover(msg); // recover message in abort case only
+                            }
+                        } else {
+                            // Enqueue and/or dequeue tx
+                            journal::txn_map& tmap = jc->get_txn_map();
+                            journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                            bool enq = false;
+                            bool deq = false;
+                            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                                if (j->_enq_flag && j->_rid == rid) enq = true;
+                                else if (!j->_enq_flag && j->_drid == rid) deq = true;
+                            }
+                            if (enq && !deq && citr->second.commit_flag) {
+                                queue->recover(msg); // recover txn message in commit case only
+                            }
+                        }
                     } else {
-                        messages[dtokp.rid()] = msg;
+                        messages[rid] = msg;
                     }
                 }
 
@@ -818,7 +867,7 @@
         txn.commit();
     } catch (const DbException& e) {
         txn.abort();
-        THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + string(e.what()));
+        THROW_STORE_EXCEPTION("Unexpected BDB error in BdbMessageStore::getExternMessage(): " + std::string(e.what()));
     } catch (...) {
         txn.abort();
         throw;
@@ -857,54 +906,81 @@
     return count;
 }
 
-void BdbMessageStore::recoverTplStore()
+void BdbMessageStore::recoverTplStore(TplRecoverMap& tplMap)
 {
     if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
         u_int64_t thisHighestRid;
         tplStorePtr->recover(tplNumJrnlFiles, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+        journal::txn_map& tmap = tplStorePtr->get_txn_map();
         if (thisHighestRid > highestRid)
             highestRid = thisHighestRid;
-        tplStorePtr->recover_complete(); // start journal.
-    }
-}
 
-void BdbMessageStore::getTplRecoverMap(TplRecoverMap& tplMap)
-{
-    if (tplStorePtr.get()) {
-        if (!tplStorePtr->is_ready())
-            recoverTplStore();
+        DataTokenImpl dtokp;
+        void* dbuff = NULL; size_t dbuffSize = 0;
+        void* xidbuff = NULL; size_t xidbuffSize = 0;
+        bool transientFlag = false;
+        bool externalFlag = false;
+        bool done = false;
+        try {
+            unsigned aio_sleep_cnt = 0;
+            while (!done) {
+                dtokp.reset();
+                dtokp.set_wstate(DataTokenImpl::ENQ);
+                switch (tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtokp)) {
+                  case rhm::journal::RHM_IORES_SUCCESS: {
+                    // Every TPL record contains both data and an XID
+                    assert(dbuffSize>0);
+                    assert(xidbuffSize>0);
+                    std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
+                    bool is2PC = *(static_cast<char*>(dbuff)) != 0;
 
-        // TODO: The journal will return a const txn_map and the txn_map will support
-        // const operations at some point. Using non-const txn_map this way is ugly...
-        journal::txn_map& tmap = tplStorePtr->get_txn_map();
-        std::vector<std::string> xidList;
-        tmap.xid_list(xidList);
-        for (std::vector<std::string>::const_iterator i = xidList.begin(); i<xidList.end(); i++) {
-            journal::txn_data_list txnList = tmap.get_tdata_list(*i);
-            unsigned enqCnt = 0;
-            unsigned deqCnt = 0;
-            u_int64_t rid = 0;
-            bool commitFlag = false;
-            for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
-                if (j->_enq_flag) {
-                    rid = j->_rid;
-                    enqCnt++;
-                } else {
-                    commitFlag = j->_commit_flag;
-                    deqCnt++;
-                }
+                    // Check transaction details; add to recover map
+                    journal::txn_data_list txnList = tmap.get_tdata_list(xid);
+                    unsigned enqCnt = 0;
+                    unsigned deqCnt = 0;
+                    u_int64_t rid = 0;
+                    bool commitFlag = false;
+                    for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+                        if (j->_enq_flag) {
+                            rid = j->_rid;
+                            enqCnt++;
+                        } else {
+                            commitFlag = j->_commit_flag;
+                            deqCnt++;
+                        }
+                    }
+                    assert(enqCnt == 1);
+                    assert(deqCnt <= 1);
+                    tplMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
+
+                    ::free(xidbuff);
+                    aio_sleep_cnt = 0;
+                    break;
+                    }
+                  case rhm::journal::RHM_IORES_PAGE_AIOWAIT:
+                    if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
+                        THROW_STORE_EXCEPTION("Timeout waiting for AIO in BdbMessageStore::recoverTplStore()");
+                    ::usleep(AIO_SLEEP_TIME);
+                    break;
+                  case rhm::journal::RHM_IORES_EMPTY:
+                    done = true;
+                    break; // done with all messages. (add call in jrnl to test that _emap is empty.)
+                  default:
+                    assert("Store Error: Unexpected msg state");
+                } // switch
             }
-            assert(enqCnt == 1);
-            assert(deqCnt <= 1);
-            tplMap.insert(TplRecoverMapPair(*i, TplRecoverStruct(rid, deqCnt == 1, commitFlag)));
+        } catch (const journal::jexception& e) {
+            THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
         }
+
+        tplStorePtr->recover_complete(); // start journal.
     }
 }
 
 void BdbMessageStore::recoverLockedMappings(txn_list& txns)
 {
     if (!tplStorePtr->is_ready())
-        getTplRecoverMap(tplRecoverMap);
+        recoverTplStore(tplRecoverMap);
 
     // Abort unprepaired xids and populate the locked maps
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
@@ -916,12 +992,13 @@
     }
 }
 
-void BdbMessageStore::collectPreparedXids(std::set<string>& xids)
+void BdbMessageStore::collectPreparedXids(std::set<std::string>& xids)
 {
     if (!tplStorePtr->is_ready())
-        getTplRecoverMap(tplRecoverMap);
+        recoverTplStore(tplRecoverMap);
     for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
-        if (!i->second.deq_flag) // Discard all txns that are to be rolled forward/back
+        // Discard all txns that are to be rolled forward/back and 1PC transactions
+        if (!i->second.deq_flag && i->second.tpc_flag)
             xids.insert(i->first);
     }
 }
@@ -994,7 +1071,7 @@
     try {
         int status = db.get(txn, &key, &peek, 0);
         if (status != DB_BUFFER_SMALL) {
-            THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + string(DbEnv::strerror(status)));
+            THROW_STORE_EXCEPTION("Unexpected status code when determining record length: " + std::string(DbEnv::strerror(status)));
         }
     } catch (const DbMemoryException& expected) {
         //api doc indicates may throw exception instead of status = DB_BUFFER_SMALL;
@@ -1230,7 +1307,7 @@
     ddtokp->set_rid(messageIdSequence.next());
     ddtokp->set_dequeue_rid(msg->getPersistenceId());
     ddtokp->set_wstate(DataTokenImpl::ENQ);
-    string tid;
+    std::string tid;
     if (ctxt) {
         TxnCtxt* txn = check(ctxt);
         tid = txn->getXid();
@@ -1339,7 +1416,8 @@
         DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);
         dtokp->set_rid(messageIdSequence.next());
-        tplStorePtr->enqueue_txn_data_record(0, 0, 0, dtokp, ctxt->getXid(), false);
+        char tpcFlag = static_cast<char>(ctxt->isTPC());
+        tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
         ctxt->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt->sync();
@@ -1409,7 +1487,7 @@
     } else if (status == DB_NOTFOUND) {
         return false;
     } else {
-        THROW_STORE_EXCEPTION("Deletion failed: " + string(DbEnv::strerror(status)));
+        THROW_STORE_EXCEPTION("Deletion failed: " + std::string(DbEnv::strerror(status)));
     }
 }
 
@@ -1489,33 +1567,33 @@
     }
 }
 
-string BdbMessageStore::getJrnlBaseDir()
+std::string BdbMessageStore::getJrnlBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/jrnl/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getBdbBaseDir()
+std::string BdbMessageStore::getBdbBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/dat/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getTplBaseDir()
+std::string BdbMessageStore::getTplBaseDir()
 {
     std::stringstream dir;
     dir << storeDir << "/rhm/tpl/" ;
     return dir.str();
 }
 
-string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
 {
     return getJrnlDir(queue.getName().c_str());
 }
 
-string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
+std::string BdbMessageStore::getJrnlDir(const char* queueName) //for exmaple /var/rhm/ + queueDir/
 {
     std::stringstream dir;
     dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.h	2008-08-12 15:27:18 UTC (rev 2279)
@@ -63,10 +63,11 @@
 
     // Structs for Transaction Recover List (TPL) recover state
     struct TplRecoverStruct {
-        u_int64_t rid;
+        u_int64_t rid; // rid of TPL record
         bool deq_flag;
         bool commit_flag;
-        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag);
+        bool tpc_flag;
+        TplRecoverStruct(const u_int64_t _rid, const bool _deq_flag, const bool _commit_flag, const bool _tpc_flag);
     };
     typedef TplRecoverStruct TplRecover;
     typedef std::pair<std::string, TplRecover> TplRecoverMapPair;
@@ -157,8 +158,7 @@
                        queue_index& index,
                        txn_list& locked,
                        message_index& prepared);
-    void recoverTplStore();
-    void getTplRecoverMap(TplRecoverMap& tplMap);
+    void recoverTplStore(TplRecoverMap& tplMap);
     void recoverLockedMappings(txn_list& txns);
     TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
     void store(const qpid::broker::PersistableQueue* queue,

Modified: store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/JournalImpl.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -199,7 +199,7 @@
         }
     }
     std::ostringstream oss2;
-    oss2 << "Recover phase I complete; highest rid found = 0x" << std::hex << highest_rid;
+    oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
     oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
     oss2 << "; journal now read-only.";
     log(LOG_DEBUG, oss2.str());
@@ -219,7 +219,7 @@
 JournalImpl::recover_complete()
 {
     jcntl::recover_complete();
-    log(LOG_DEBUG, "Recover phase II complete; journal now writable.");
+    log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
 }
 
 #define MAX_AIO_SLEEPS 1000  // 10 sec 

Modified: store/branches/mrg-1.0/cpp/lib/TxnCtxt.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/TxnCtxt.h	2008-08-12 15:27:18 UTC (rev 2279)
@@ -86,7 +86,7 @@
             try {
                 if (commit) {
                     jc->txn_commit(dtokp.get(), getXid());
-                    if (isTPC()) sync();
+                    sync();
                 } else {
                     jc->txn_abort(dtokp.get(), getXid());
                 }
@@ -106,6 +106,8 @@
         }
     }
 
+    TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
     /**
      * Call to make sure all the data for this txn is written to safe store
      *

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -130,16 +130,13 @@
 }
 
 bool
-enq_map::is_enqueued(const u_int64_t rid)
+enq_map::is_enqueued(const u_int64_t rid, bool ignore_lock)
 {
-    emap_itr itr;
-    {
-        slock s(&_mutex);
-        itr = _map.find(rid);
-    }
+    slock s(&_mutex);
+    emap_itr itr = _map.find(rid);
     if (itr == _map.end()) // not found in map
         return false;
-    if (itr->second.second) // locked
+    if (!ignore_lock && itr->second.second) // locked
         return false;
     return true;
 }

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/enq_map.hpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -95,7 +95,7 @@
         void insert_fid(const u_int64_t rid, const u_int16_t fid, const bool locked);
         u_int16_t get_fid(const u_int64_t rid);
         u_int16_t get_remove_fid(const u_int64_t rid, const bool txn_flag = false);
-        bool is_enqueued(const u_int64_t rid);
+        bool is_enqueued(const u_int64_t rid, bool ignore_lock = false);
         void lock(const u_int64_t rid);
         void unlock(const u_int64_t rid);
         bool is_locked(const u_int64_t rid);

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -557,8 +557,15 @@
         *     false if the rid is transactionally enqueued and is not committed, or if it is
         *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
         */
-        inline bool is_enqueued(const u_int64_t rid) { return _emap.is_enqueued(rid); }
-        
+        inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
+                { return _emap.is_enqueued(rid, ignore_lock); }
+        inline bool is_locked(const u_int64_t rid)
+                { if (_emap.is_enqueued(rid, true)) return _emap.is_locked(rid); return false; }
+        inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
+        inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
+        // TODO Make this a const, but txn_map must support const first.
+        inline txn_map& get_txn_map() { return _tmap; }
+
         /**
         * \brief Check if the journal is stopped.
         *
@@ -607,8 +614,6 @@
         inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }
         
         inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
-        // TODO Make this a const, but txn_map must support const first.
-        inline txn_map& get_txn_map() { return _tmap; }
 
         // Logging
         virtual void log(log_level level, const std::string& log_stmt) const;

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -94,6 +94,7 @@
 const u_int32_t jerrno::JERR_WMGR_BADDTOKSTATE  = 0x0802;
 const u_int32_t jerrno::JERR_WMGR_ENQDISCONT    = 0x0803;
 const u_int32_t jerrno::JERR_WMGR_DEQDISCONT    = 0x0804;
+const u_int32_t jerrno::JERR_WMGR_DEQRIDNOTENQ  = 0x0805;
 
 // class rmgr
 const u_int32_t jerrno::JERR_RMGR_UNKNOWNMAGIC  = 0x0900;
@@ -187,7 +188,9 @@
             "Enqueued new dtok when previous enqueue returned partly completed (state ENQ_PART).";
     _err_map[JERR_WMGR_DEQDISCONT] = "JERR_WMGR_DEQDISCONT: "
             "Dequeued new dtok when previous dequeue returned partly completed (state DEQ_PART).";
+    _err_map[JERR_WMGR_DEQRIDNOTENQ] = "JERR_WMGR_DEQRIDNOTENQ: Dequeue rid is not enqueued.";
 
+
     // class rmgr
     _err_map[JERR_RMGR_UNKNOWNMAGIC] = "JERR_RMGR_UNKNOWNMAGIC: Found record with unknown magic.";
     _err_map[JERR_RMGR_RIDMISMATCH] = "JERR_RMGR_RIDMISMATCH: "

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jerrno.hpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -112,6 +112,7 @@
         static const u_int32_t JERR_WMGR_BADDTOKSTATE;  ///< Data token in illegal state.
         static const u_int32_t JERR_WMGR_ENQDISCONT;    ///< Enq. new dtok when previous part compl.
         static const u_int32_t JERR_WMGR_DEQDISCONT;    ///< Deq. new dtok when previous part compl.
+        static const u_int32_t JERR_WMGR_DEQRIDNOTENQ;  ///< Deq. rid not enqueued
 
         // class rmgr
         static const u_int32_t JERR_RMGR_UNKNOWNMAGIC;  ///< Found record with unknown magic

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/wmgr.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -39,6 +39,8 @@
 #include "jrnl/jerrno.hpp"
 #include <sstream>
 
+//#include <iostream> // debug
+
 namespace rhm
 {
 namespace journal
@@ -575,6 +577,7 @@
         dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
         _commit_busy = true;
     }
+//std::cout << " * commit, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
     bool done = false;
     while (!done)
     {
@@ -605,9 +608,12 @@
             for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
             {
                 if (itr->_enq_flag) // txn enqueue
+//{ std::cout << " * commit enq - add enq rid=0x" << std::hex << itr->_rid << std::dec << std::endl << std::flush;
                     _emap.insert_fid(itr->_rid, itr->_fid);
+//}
                 else // txn dequeue
                 {
+//std::cout << " * commit deq - remove enq rid=0x" << std::hex << itr->_drid << std::dec << std::endl << std::flush;
                     u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
                     _wrfc.decr_enqcnt(fid);
                 }
@@ -834,6 +840,7 @@
 #endif
                         break;
                     case data_tok::COMMIT_SUBM:
+//std::cout << " * commit-ret, queue=" << _jc->id() << " xid=" << dtokp->xid() << std::endl << std::flush;
                         dtokl.push_back(dtokp);
                         tot_data_toks++;
                         dtokp->set_wstate(data_tok::COMMITTED);
@@ -1018,13 +1025,34 @@
 wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
 {
     // First check emap
-    try { _emap.get_fid(drid); }
+    bool found = false;
+    try
+    {
+        _emap.get_fid(drid);
+        found = true;
+    }
     catch(const jexception& e)
     {
         if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
             throw;
-        _tmap.get_data(xid, drid); // not in emap, try tmap
+        if (xid.size())
+            try
+            {
+                _tmap.get_data(xid, drid); // not in emap, try tmap
+                found = true;
+            }
+            catch (const jexception& e)
+            {
+                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                    throw;
+            }
     }
+    if (!found)
+    {
+        std::ostringstream oss;
+        oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
+        throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
+    }
 }
 
 void

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -408,7 +408,7 @@
         u_int64_t rid = enq_msg(jc, 0, create_msg(msg, 0, MSG_SIZE), false);
         deq_msg(jc, rid);
         try{ deq_msg(jc, rid); BOOST_ERROR("Did not throw exception on second dequeue."); }
-        catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_MAP_NOTFOUND); }
+        catch (const jexception& e){ BOOST_CHECK_EQUAL(e.err_code(), jerrno::JERR_WMGR_DEQRIDNOTENQ); }
         rid = enq_msg(jc, 1, create_msg(msg, 1, MSG_SIZE), false);
         deq_msg(jc, rid);
     }

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/_st_basic_txn.cpp	2008-08-12 15:27:18 UTC (rev 2279)
@@ -87,9 +87,9 @@
             try
             {
                 deq_msg(jc, m);
-                BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+                BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
             }
-            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
         }
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }
@@ -138,9 +138,9 @@
             try
             {
                 deq_msg(jc, 3*m);
-                BOOST_ERROR("Expected dequeue to fail with exception JERR_MAP_NOTFOUND.");
+                BOOST_ERROR("Expected dequeue to fail with exception JERR_WMGR_DEQRIDNOTENQ.");
             }
-            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw; }
+            catch (const jexception& e) { if (e.err_code() != jerrno::JERR_WMGR_DEQRIDNOTENQ) throw; }
         }
     }
     catch(const exception& e) { BOOST_FAIL(e.what()); }

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-12 15:27:18 UTC (rev 2279)
@@ -445,6 +445,7 @@
         self.first_rec = False
         self.last_file = False
         self.last_rid = -1
+        self.fhdr_owi_at_msg_start = None
 
         self.proc_args(argv)
         self.proc_csv()
@@ -474,6 +475,7 @@
                 stop = True;
             else:
                 self.rec_cnt += 1
+                self.fhdr_owi_at_msg_start = self.fhdr.owi()
                 if self.first_rec:
                     if self.fhdr.fro != hdr.foffs:
                         raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -656,7 +658,7 @@
         return self.file_num
 
     def check_owi(self, hdr):
-        return self.fhdr.owi() == hdr.owi()
+        return self.fhdr_owi_at_msg_start == hdr.owi()
 
     def check_rid(self, hdr):
         if  self.last_rid != -1 and hdr.rid <= self.last_rid:

Modified: store/branches/mrg-1.0/cpp/tests/persistence.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/persistence.py	2008-08-11 22:07:32 UTC (rev 2278)
+++ store/branches/mrg-1.0/cpp/tests/persistence.py	2008-08-12 15:27:18 UTC (rev 2279)
@@ -231,7 +231,7 @@
         if txc.global_id not in ids:    
             self.fail("Recovered xids not as expected. missing: %s" % (txc))
         if txd.global_id not in ids:    
-            self.fail("Recovered xids not as expected. missing: %s" % (txc))
+            self.fail("Recovered xids not as expected. missing: %s" % (txd))
         self.assertEqual(2, len(xids))    
 
 




More information about the rhmessaging-commits mailing list