[rhmessaging-commits] rhmessaging commits: r2297 - in store/branches/mrg-1.0/cpp: lib/jrnl and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Wed Aug 13 15:24:21 EDT 2008
Author: kpvdr
Date: 2008-08-13 15:24:20 -0400 (Wed, 13 Aug 2008)
New Revision: 2297
Modified:
store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Additional fixes for BZ458053 "txtest failures when broker killed during transfer phase".
Modified: store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/BdbMessageStore.cpp 2008-08-13 19:24:20 UTC (rev 2297)
@@ -576,6 +576,8 @@
}
if (incomplTplTxnFlag) {
opcc->complete(citr->second.commit_flag);
+ } else {
+ completed(*opcc.get(), citr->second.commit_flag);
}
}
}
@@ -722,7 +724,6 @@
txn_list& prepared,
message_index& messages)
{
-//std::cout << "***** recoverMessages(): queue=" << queue->getName() << std::endl;
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -783,7 +784,10 @@
std::string xid(i->xid);
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
- if (citr->second.deq_flag) { // deq present in prepared list, this xid is part of incomplete txn commit/abort
+
+ // deq present in prepared list, this xid is part of incomplete txn commit/abort
+ // or this is a 1PC txn that must be rolled forward
+ if (citr->second.deq_flag || !citr->second.tpc_flag) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
@@ -939,7 +943,11 @@
unsigned enqCnt = 0;
unsigned deqCnt = 0;
u_int64_t rid = 0;
- bool commitFlag = false;
+
+ // Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
+ // Note: will apply to both 1PC and 2PC transactions.
+ bool commitFlag = true;
+
for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->_enq_flag) {
rid = j->_rid;
@@ -1412,6 +1420,10 @@
try {
chkTplStoreInit(); // Late initialize (if needed)
+ // This sync is requred to ensure multi-queue atomicity - ie all txn data
+ // must hit the disk on *all* queues before the TPL prepare (enq) is written.
+ ctxt->sync();
+
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
Modified: store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h
===================================================================
--- store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h 2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/PreparedTransaction.h 2008-08-13 19:24:20 UTC (rev 2297)
@@ -47,6 +47,7 @@
void add(queue_id queue, message_id message);
bool isLocked(queue_id queue, message_id message);
+ std::size_t size() { return locked.size(); }
iterator begin() { return locked.begin(); }
iterator end() { return locked.end(); }
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.cpp 2008-08-13 19:24:20 UTC (rev 2297)
@@ -640,8 +640,20 @@
for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end(); itr++)
{
std::vector<std::string>::const_iterator pitr = std::find(prep_txn_list_ptr->begin(), prep_txn_list_ptr->end(), *itr);
- if (pitr == prep_txn_list_ptr->end())
- _tmap.get_remove_tdata_list(*itr);
+ if (pitr == prep_txn_list_ptr->end()) // not found in prepared list
+ {
+ txn_data_list tdl = _tmap.get_remove_tdata_list(*itr);
+ // Unlock any affected enqueues in emap
+ for (tdl_itr i=tdl.begin(); i<tdl.end(); i++)
+ {
+ if (i->_enq_flag) // enq op - decrement enqueue count
+ rd._enq_cnt_list[i->_fid]--;
+ else if (_emap.is_enqueued(i->_drid, true)) // deq op - unlock enq record
+ _emap.unlock(i->_drid);
+ }
+ // Write abort record to disk
+ rcvr_write_abort(rd, *itr);
+ }
}
}
}
@@ -957,7 +969,7 @@
ofsp.write((const char*)buff, JRNL_DBLK_SIZE);
assert(!ofsp.fail());
std::ostringstream oss;
- oss << "Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+ oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
this->log(LOG_NOTICE, oss.str());
file_pos = ofsp.tellp();
}
@@ -967,5 +979,99 @@
}
}
+// TODO - FIXME - Unify the recover and normal aio write methods.
+// Normally, writes are not performed during recover mode (journal is in read-only
+// mode) so initialization of the aio write controllers is deferred until recover
+// is complete. Currenlty because journal is still in recover mode when
+// rcvr_write_abort() is called, normal writes are not possible, so std::ofstream
+// writes are used instead. Lots of logic duplication!
+void
+jcntl::rcvr_write_abort(rcvdat& rd, std::string xid)
+{
+ const u_int32_t sblk_size = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+ const u_int32_t amagic = RHM_JDAT_TXA_MAGIC;
+ const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
+
+ // Check last record ends on sblk boundary
+ assert(rd._eo % sblk_size == 0);
+
+ // Find fid and posn to write
+ u_int16_t fid = rd._lfid;
+ std::streampos file_pos = rd._eo;
+ if (rd._eo/sblk_size >= _jfsize_sblks) // file full, use next file
+ {
+ if (++fid >= _num_jfiles)
+ {
+ fid = 0;
+ rd._owi = !rd._owi;
+ }
+ file_pos = 0;
+ }
+
+ // Prepare a buffer of at least 1 sblock
+ u_int32_t abort_dblks = txn_rec::size_dblks(sizeof(txn_hdr) + xid.size());
+ std::size_t buffsize = abort_dblks < JRNL_SBLK_SIZE ? JRNL_SBLK_SIZE : abort_dblks;
+ void* buff = std::malloc(buffsize * JRNL_DBLK_SIZE);
+ assert(buff != 0);
+
+ // Initialize file stream
+ std::ostringstream fn;
+ fn << _jdir.dirname() << "/" << _base_filename << ".";
+ fn << std::hex << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+ std::ofstream ofs(fn.str().c_str(),
+ std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+ if (!ofs.good())
+ throw jexception(jerrno::JERR__FILEIO, fn.str(), "jcntl", "rcvr_write_abort");
+ if (file_pos)
+ ofs.seekp(file_pos);
+ else
+ {
+ // New file, write new file header
+ file_hdr fh(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, ++rd._h_rid, fid, sblk_size, rd._owi, true);
+ ofs.write((const char*)&fh, sizeof(file_hdr));
+ assert(!ofs.fail());
+ // fill remainder of sblk with fill char
+ std::memset((char*)buff, RHM_CLEAN_CHAR, sblk_size - sizeof(file_hdr));
+ ofs.write((const char*)buff, sblk_size - sizeof(file_hdr));
+ assert(!ofs.fail());
+ // log write action
+ std::ostringstream oss;
+ oss << "Recover phase write: File header in fid=" << fid << " at offs=0x0 for txn abort record";
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+ }
+
+ // Write abort record
+ txn_rec ar(amagic, ++rd._h_rid, xid.data(), xid.size(), rd._owi);
+ u_int32_t res = ar.encode(buff, 0, abort_dblks);
+ assert(res == abort_dblks);
+ ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofs.fail());
+ // log write action
+ std::ostringstream oss;
+ oss << "Recover phase write: Aborted unprepared transaction xid=" << xid << " at offs=0x" << std::hex << file_pos << std::dec;
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+
+ // Prepare filler record
+ std::memcpy(buff, (void*)&xmagic, sizeof(xmagic));
+ std::memset((char*)buff + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
+ // Write filler as many times as needed to get to next sblk boundary
+ while (file_pos % sblk_size)
+ {
+ ofs.write((const char*)buff, JRNL_DBLK_SIZE);
+ assert(!ofs.fail());
+ std::ostringstream oss;
+ oss << "Recover phase write: Wrote filler record at offs=0x" << std::hex << file_pos << std::dec;
+ this->log(LOG_NOTICE, oss.str());
+ file_pos = ofs.tellp();
+ }
+ rd._eo = file_pos;
+
+ // Clean up
+ ofs.close();
+ std::free(buff);
+}
+
} // namespace journal
} // namespace rhm
Modified: store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/lib/jrnl/jcntl.hpp 2008-08-13 19:24:20 UTC (rev 2297)
@@ -673,6 +673,8 @@
std::streampos& read_pos);
void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset);
+
+ void rcvr_write_abort(rcvdat& rd, std::string xid);
};
} // namespace journal
Modified: store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-13 17:44:19 UTC (rev 2296)
+++ store/branches/mrg-1.0/cpp/tests/jrnl/jtt/jfile_chk.py 2008-08-13 19:24:20 UTC (rev 2297)
@@ -579,7 +579,8 @@
for rec in self.tmap[hdr.xid]:
if isinstance(rec[1], DeqHdr):
if self.emap[rec[1].deq_rid] != None:
- self.emap[rec[1].deq_rid][2] = False # Unlock enq record
+ t = self.emap[rec[1].deq_rid]
+ self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
del self.tmap[hdr.xid]
if len(mismatched_rids) > 0:
warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
More information about the rhmessaging-commits
mailing list