[rhmessaging-commits] rhmessaging commits: r2297 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Aug 13 15:24:21 EDT 2008


Author: kpvdr
Date: 2008-08-13 15:24:20 -0400 (Wed, 13 Aug 2008)
New Revision: 2297

Modified:
   store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
   store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
   store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
   store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Additional fixes for BZ458053 "txtest failures when broker killed during transfer phase".

Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp	2008-08-13 19:24:20 UTC (rev 2297)
@@ -576,6 +576,8 @@
             }
             if (incomplTplTxnFlag) {
                 opcc->complete(citr->second.commit_flag);
+            } else {
+                completed(*opcc.get(), citr->second.commit_flag);
             }
         }
     }
@@ -722,7 +724,6 @@
                                       txn_list& prepared,
                                       message_index& messages)
 {
-//std::cout << "***** recoverMessages(): queue=" << queue->getName() << std::endl;
     size_t preambleLength = sizeof(u_int32_t)/*header size*/;
 
     JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -783,7 +784,10 @@
                     std::string xid(i->xid);
                     TplRecoverMapCitr citr = tplRecoverMap.find(xid);
                     if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
-                    if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+
+                    // deq present in prepared list, this xid is part of incomplete txn commit/abort
+                    // or this is a 1PC txn that must be rolled forward
+                    if (citr->second.deq_flag || !citr->second.tpc_flag) {
                         if (jc->is_enqueued(rid, true)) {
                             // Enqueue is non-tx, dequeue tx
                             assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
@@ -939,7 +943,11 @@
                     unsigned enqCnt = 0;
                     unsigned deqCnt = 0;
                     u_int64_t rid = 0;
-                    bool commitFlag = false;
+
+                    // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+                    // Note: will apply to both 1PC and 2PC transactions.
+                    bool commitFlag = true;
+
                     for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
                         if (j->_enq_flag) {
                             rid = j->_rid;
@@ -1412,6 +1420,10 @@
     try {
         chkTplStoreInit(); // Late initialize (if needed)
 
+        // This sync is requred to ensure multi-queue atomicity - ie all txn data
+        // must hit the disk on *all* queues before the TPL prepare (enq) is written.
+        ctxt->sync();
+
         ctxt->incrDtokRef();
         DataTokenImpl* dtokp = ctxt->getDtok();
         dtokp->set_external_rid(true);

Modified: store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h	2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h	2008-08-13 19:24:20 UTC (rev 2297)
@@ -47,6 +47,7 @@
 
     void add(queue_id queue, message_id message);
     bool isLocked(queue_id queue, message_id message);
+    std::size_t size() { return locked.size(); }
     iterator begin() { return locked.begin(); }
     iterator end() { return locked.end(); }
 

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp	2008-08-13 19:24:20 UTC (rev 2297)
@@ -640,8 +640,20 @@
             for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
             {
                 std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
-                if (pitr == prep_txn_list_ptr->end())
-                    _tmap.get_remove_tdata_list(*itr);
+                if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
+                {
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+                    // Unlock any affected enqueues in emap
+                    for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
+                    {
+                        if (i->_enq_flag) // enq op - decrement enqueue count
+                            rd._enq_cnt_list[i->_fid]--;
+                        else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
+                            _emap.unlock(i->_drid);
+                    }
+                    // Write abort record to disk
+                    rcvr_write_abort(rd, *itr);
+                }
             }
         }
     }
@@ -957,7 +969,7 @@
             ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
             assert(!ofsp.fail());
             std::ostringstream oss;
-            oss << "Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+            oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
             this->log(LOG_NOTICE, oss.str());
             file_pos = ofsp.tellp();
         }
@@ -967,5 +979,99 @@
     }
 }
 
+// TODO - FIXME - Unify the recover and normal aio write methods.
+// Normally, writes are not performed during recover mode (journal is in read-only
+// mode) so initialization of the aio write controllers is deferred until recover
+// is complete. Currenlty because journal is still in recover mode when
+// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
+// writes are used instead. Lots of logic duplication!
+void
+jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
+{
+    const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+    const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
+    const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+
+    // Check last record ends on sblk boundary
+    assert(rd._eo % sblk_size == 0);
+    
+    // Find fid and posn to write
+    u_int16_t fid = rd._lfid;
+    std::streampos file_pos = rd._eo;
+    if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
+    {
+        if (++fid >= _num_jfiles)
+        {
+            fid = 0;
+            rd._owi = !rd._owi;
+        }
+        file_pos = 0;
+    }
+
+    // Prepare a buffer of at least 1 sblock
+    u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
+    std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
+    void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
+    assert(buff != 0);
+
+    // Initialize file stream
+    std::ostringstream fn;
+    fn << _jdir.dirname() << "/" << _base_filename << ".";
+    fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+    std::ofstream ofs(fn.str().c_str(),
+            std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+    if (!ofs.good())
+        throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
+    if (file_pos)
+        ofs.seekp(file_pos);
+    else
+    {
+        // New file, write new file header
+        file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
+        ofs.write((const char*)&fh, sizeof(file_hdr));
+        assert(!ofs.fail());
+        // fill remainder of sblk with fill char
+        std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
+        ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
+        assert(!ofs.fail());
+        // log write action
+        std::ostringstream oss;
+        oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
+        this->log(LOG_NOTICE, oss.str());
+        file_pos = ofs.tellp();
+    }
+    
+    // Write abort record
+    txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
+    u_int32_t res = ar.encode(buff, 0, abort_dblks);
+    assert(res == abort_dblks);
+    ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+    assert(!ofs.fail());
+    // log write action
+    std::ostringstream oss;
+    oss << "Recover phase write: Aborted unprepared transaction xid=" << xid << " at offs=0x" << std::hex << file_pos << std::dec;
+    this->log(LOG_NOTICE, oss.str());
+    file_pos = ofs.tellp();
+
+    // Prepare filler record
+    std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+    std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+    // Write filler as many times as needed to get to next sblk boundary
+    while (file_pos % sblk_size)
+    {
+        ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+        assert(!ofs.fail());
+        std::ostringstream oss;
+        oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+        this->log(LOG_NOTICE, oss.str());
+        file_pos = ofs.tellp();
+    }
+    rd._eo = file_pos;
+
+    // Clean up
+    ofs.close();
+    std::free(buff);
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp	2008-08-13 19:24:20 UTC (rev 2297)
@@ -673,6 +673,8 @@
                 std::streampos& read_pos);
         
         void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
+
+        void rcvr_write_abort(rcvdat& rd, std::string xid);
     };
 
 } // namespace journal

Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py	2008-08-13 19:24:20 UTC (rev 2297)
@@ -579,7 +579,8 @@
                                 for rec in self.tmap[hdr.xid]:
                                     if isinstance(rec[1], DeqHdr):
                                         if self.emap[rec[1].deq_rid] != None:
-                                            self.emap[rec[1].deq_rid][2] = False # Unlock enq record
+                                            t = self.emap[rec[1].deq_rid]
+                                            self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
                             del self.tmap[hdr.xid]
                             if len(mismatched_rids) > 0:
                                 warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids




More information about the rhmessaging-commits mailing list