[rhmessaging-commits] rhmessaging commits: r1611 - in store/trunk/cpp: tests/jrnl/jtt and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jan 25 14:39:33 EST 2008


Author: kpvdr
Date: 2008-01-25 14:39:32 -0500 (Fri, 25 Jan 2008)
New Revision: 1611

Modified:
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp
   store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
   store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp
   store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp
   store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
Log:
Various fixes for recover, especially transactional recover, both in the journal and the test tool.

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -448,7 +448,11 @@
         {
             _wmgr.get_events(pmgr::UNUSED);
             if (cnt++ > MAX_AIO_CMPL_SLEEPS)
+            {
+                // TODO: Log this!
+                std::cout << "**** JERR_JCNTL_AIOCMPLWAIT *** " << _wmgr.status_str() << std::endl;
                 throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
+            }
             ::usleep(AIO_CMPL_SLEEP);
         }
         return true;
@@ -501,7 +505,8 @@
         u_int16_t next_wr_fid = (rd._lfid + 1) % _num_jfiles;
         if (rd._ffid == next_wr_fid && rd._enq_cnt_list[next_wr_fid])
             rd._full = true;
-        
+
+        // Remove all transactions not in prep_txn_list      
 		std::vector<std::string> xid_list;
 		_tmap.xid_list(xid_list);
 		for (std::vector<std::string>::iterator itr = xid_list.begin(); itr != xid_list.end();
@@ -590,21 +595,27 @@
                 ar.get_xid(&xidp);
                 assert(xidp != 0);
                 std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                try
                 {
-                    try 
-					{ 
-						if (!itr->_enq_flag)
-							_emap.unlock(itr->_drid);
-					}
-                    catch(const jexception& e)
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                     {
-                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                        if (itr->_enq_flag)
+                            rd._enq_cnt_list[itr->_fid]--;
+                        else
+                        {
+                            try { _emap.unlock(itr->_drid); }
+                            catch(const jexception& e)
+                            {
+                                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                            }
+                        }
                     }
-                    if (itr->_enq_flag)
-                        rd._enq_cnt_list[itr->_fid]--;
                 }
+                catch (const jexception& e)
+                {
+                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                }
                 ::free(xidp);
             }
             break;
@@ -617,17 +628,31 @@
                 cr.get_xid(&xidp);
                 assert(xidp != 0);
                 std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                try
                 {
-                    if (itr->_enq_flag) // txn enqueue
-                        _emap.insert_fid(itr->_rid, itr->_fid);
-                    else // txn dequeue
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                     {
-                        u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
-                        rd._enq_cnt_list[fid]--;
+                        if (itr->_enq_flag) // txn enqueue
+                            _emap.insert_fid(itr->_rid, itr->_fid);
+                        else // txn dequeue
+                        {
+                            try
+                            {
+                                u_int16_t fid = _emap.get_remove_fid(itr->_drid, true);
+                                rd._enq_cnt_list[fid]--;
+                            }
+                            catch (const jexception& e)
+                            {
+                                if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                            }
+                        }
                     }
                 }
+                catch (const jexception& e)
+                {
+                    if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw;
+                }
                 ::free(xidp);
             }
             break;

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -89,7 +89,7 @@
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
-        oss << std::hex << "xid=\"" << xid << "\"";
+        oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_tdata_list");
     }
     return itr->second;
@@ -104,7 +104,7 @@
     {
         pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
-        oss << std::hex << "xid=\"" << xid << "\"";
+        oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map",
                 "get_remove_tdata_list");
     }
@@ -134,7 +134,7 @@
     if (itr == _map.end()) // not found in map
     {
         std::ostringstream oss;
-        oss << std::hex << "xid=\"" << xid << "\"";
+        oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "get_rid_count");
     }
     return itr->second.size();
@@ -149,7 +149,7 @@
     {
         pthread_mutex_unlock(&_mutex);
         std::ostringstream oss;
-        oss << std::hex << "xid=\"" << xid << "\"";
+        oss << std::hex << "xid=" << xid_format(xid);
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "is_txn_synced");
     }
     bool is_synced = true;
@@ -190,7 +190,7 @@
     if (ok && !found)
     {
         std::ostringstream oss;
-        oss << std::hex << "xid=\"" << xid << "\" rid=" << rid;
+        oss << std::hex << "xid=" << xid_format(xid) << " rid=" << rid;
         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "txn_map", "set_aio_compl");
     }
     return ok;
@@ -206,5 +206,17 @@
     pthread_mutex_unlock(&_mutex);
 }
 
+// static fn
+std::string
+txn_map::xid_format(const std::string& xid)
+{
+    if (xid.size() < 100)
+        return xid;
+    std::ostringstream oss;
+    oss << "\"" << xid.substr(0, 20) << " ... " << xid.substr(xid.size() - 20, 20);
+    oss << "\" (size: " << xid.size() << ")";
+    return oss.str();
+}
+
 } // namespace journal
 } // namespace rhm

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -129,6 +129,8 @@
         inline const bool empty() const { return _map.empty(); }
         inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
         void xid_list(std::vector<std::string>& xv);
+    private:
+        static std::string xid_format(const std::string& xid);
     };
 
 } // namespace journal

Modified: store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/_ut_test_case_set.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -111,10 +111,16 @@
     test_case_set tcs;
     BOOST_REQUIRE_MESSAGE(check_csv_file(csv_file.c_str()), "Test CSV file \"" << csv_file <<
             "\" is missing.");
-    unsigned num_tcs = tcs.append_from_csv(csv_file);
+    tcs.append_from_csv(csv_file, false);
     BOOST_CHECK(!tcs.empty());
-    BOOST_CHECK_EQUAL(num_tcs, unsigned(44));
     BOOST_CHECK_EQUAL(tcs.size(), unsigned(44));
+    BOOST_CHECK_EQUAL(tcs.ignored(), unsigned(0));
+    tcs.clear();
+    BOOST_CHECK(tcs.empty());
+    tcs.append_from_csv(csv_file, true);
+    BOOST_CHECK(!tcs.empty());
+    BOOST_CHECK_EQUAL(tcs.size(), unsigned(18));
+    BOOST_CHECK_EQUAL(tcs.ignored(), unsigned(26));
 }
 
 

Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py	2008-01-25 19:39:32 UTC (rev 1611)
@@ -119,7 +119,7 @@
 
 def split_str(s):
     if len(s) > 25:
-        return s[:10] + ' ... ' + s[-10:]
+        return s[:12] + ' ... ' + s[-10:]
     else:
         return s
 
@@ -435,6 +435,7 @@
         self.file_num = 0
         self.fro = 0x200
         self.enqueued = {}
+        self.rec_cnt = 0
         self.msg_cnt = 0
         self.fhdr = None
         self.f = None
@@ -468,6 +469,8 @@
                 break
             if hdr.check():
                 stop = True;
+            else:
+                self.rec_cnt += 1
             if self.first_rec:
                 if self.fhdr.fro != hdr.foffs:
                     raise Exception('File header first record offset mismatch: fro=0x%08x; rec_offs=0x%08x' % (self.fhdr.fro, hdr.foffs))
@@ -569,7 +572,7 @@
                 tss = fhdr.timestamp_str()
                 owi_found = True
             if not self.qflag:
-                print '  %s: owi=%s rid=%d, fro=0x%08x ts=%s' % (jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
+                print '  %s: owi=%s rid=0x%x, fro=0x%08x ts=%s' % (jfn, fhdr.owi(), fhdr.rid, fhdr.fro, fhdr.timestamp_str())
         if fnum < 0 or rid < 0 or fro < 0:
             raise Exception('All journal files empty')
         if not self.qflag: print '  Oldest complete file: %s: rid=%d, fro=0x%08x ts=%s' % (fname, rid, fro, tss)
@@ -736,7 +739,7 @@
                 for h in self.enqueued:
                     print self.enqueued[h]
                 print 'WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain.' % len(self.enqueued)
-            print 'Test passed; %d records processed.' % self.msg_cnt
+            print '%d enqueues, %d journal records processed.' % (self.msg_cnt, self.rec_cnt)
 
 
 #===============================================================================

Modified: store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/jrnl_instance.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -78,6 +78,8 @@
 jrnl_instance::init_tc(test_case::shared_ptr& tcp, const bool recover_mode, const bool keep_jrnls)
         throw ()
 {
+    test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
+    _tcrp = p;
     try
     {
         _tcp = tcp;
@@ -93,22 +95,19 @@
             {
             std::vector<std::string> prep_txn_list;
             u_int64_t highest_rid;
-            recover(0, 0, prep_txn_list, highest_rid);
+            recover(aio_rd_callback, aio_wr_callback, prep_txn_list, highest_rid);
             recover_complete();
             }
             catch (const rhm::journal::jexception& e)
             {
                 if (e.err_code() == rhm::journal::jerrno::JERR_JDIR_STAT)
-                    initialize(0, 0);
+                    initialize(aio_rd_callback, aio_wr_callback);
                 else
                     throw;
             }
         }
         else
             initialize(aio_rd_callback, aio_wr_callback);
-
-        test_case_result::shared_ptr p(new test_case_result(_jpp->jid()));
-        _tcrp = p;
     }
     catch (const rhm::journal::jexception& e) { _tcrp->add_exception(e); }
     catch (const std::exception& e) { _tcrp->add_exception(e.what()); }
@@ -189,7 +188,7 @@
                 {
                     std::ostringstream oss;
                     oss << "ERROR: Timeout waiting for journal \"" << _jid;
-                    oss << "\" to empty.";
+                    oss << "\" to empty. (RHM_IORES_ENQCAPTHRESH)";
                     _tcrp->add_exception(oss.str());
                 }
                 else

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_set.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -24,6 +24,7 @@
 #include "test_case_set.hpp"
 
 #include <fstream>
+#include <iostream>
 
 namespace rhm
 {
@@ -31,13 +32,16 @@
 {
 
 test_case_set::test_case_set():
-        _tc_list()
+        _tc_list(),
+        _csv_ignored(0)
 {}
 
-test_case_set::test_case_set(const std::string& csv_filename, const csv_map& cols):
-        _tc_list()
+test_case_set::test_case_set(const std::string& csv_filename, const bool recover_mode,
+        const csv_map& cols):
+        _tc_list(),
+        _csv_ignored(0)
 {
-    append_from_csv(csv_filename, cols);
+    append_from_csv(csv_filename, recover_mode, cols);
 }
 
 test_case_set::~test_case_set()
@@ -57,10 +61,10 @@
 
 
 #define CSV_BUFF_SIZE 2048
-unsigned
-test_case_set::append_from_csv(const std::string& csv_filename, const csv_map& cols)
+void
+test_case_set::append_from_csv(const std::string& csv_filename, const bool recover_mode,
+        const csv_map& cols)
 {
-    unsigned num_tc = 0;
     char buff[CSV_BUFF_SIZE];
     std::ifstream ifs(csv_filename.c_str());
     while (ifs.good())
@@ -71,12 +75,13 @@
             test_case::shared_ptr tcp = get_tc_from_csv(buff, cols);
             if (tcp.get())
             {
-                append(tcp);
-                num_tc++;
+                if (!recover_mode || tcp->auto_deq())
+                    append(tcp);
+                else
+                    _csv_ignored++;
             }
         }
     }
-    return num_tc;
 }
 
 test_case::shared_ptr

Modified: store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_case_set.hpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -64,13 +64,16 @@
     private:
         tcl _tc_list;
         static bool _map_init;
+        unsigned _csv_ignored;
 
     public:
         test_case_set();
-        test_case_set(const std::string& csv_filename, const csv_map& cols = std_csv_map);
+        test_case_set(const std::string& csv_filename, const bool recover_mode,
+                const csv_map& cols = std_csv_map);
         virtual ~test_case_set();
 
          inline const unsigned size() const { return _tc_list.size(); }
+         inline const unsigned ignored() const { return _csv_ignored; }
          inline const bool empty() const { return _tc_list.empty(); }
 
         inline void append(const test_case::shared_ptr& tc) { _tc_list.push_back(tc); }
@@ -79,10 +82,12 @@
                 const size_t min_xid_size, const size_t max_xid_size,
                 const test_case::transient_t transient,
                 const test_case::external_t external, const std::string& comment);
-        unsigned append_from_csv(const std::string& csv_filename, const csv_map& cols = std_csv_map);
+        void append_from_csv(const std::string& csv_filename, const bool recover_mode,
+                const csv_map& cols = std_csv_map);
         inline tcl_itr begin() { return _tc_list.begin(); }
         inline tcl_itr end() { return _tc_list.end(); }
         inline const test_case::shared_ptr& operator[](unsigned i) { return _tc_list[i]; }
+        inline void clear() { _tc_list.clear(); }
 
     private:
         test_case::shared_ptr get_tc_from_csv(const std::string& csv_line, const csv_map& cols);

Modified: store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-01-25 16:50:04 UTC (rev 1610)
+++ store/trunk/cpp/tests/jrnl/jtt/test_mgr.cpp	2008-01-25 19:39:32 UTC (rev 1611)
@@ -52,11 +52,15 @@
     const bool summary = true;
 
     std::cout << "CSV file: \"" << _opts.test_case_csv_file_name << "\"";
-    test_case_set tcs(_opts.test_case_csv_file_name);
+    test_case_set tcs(_opts.test_case_csv_file_name, _opts.recover_mode);
     const unsigned num_test_cases = tcs.size();
     if (num_test_cases)
     {
-        std::cout << " (containing " << tcs.size() << " test cases)" << std::endl;
+        std::cout << " (found " << tcs.size() << " test case" << (tcs.size() != 1 ? "s" : "") <<
+                ")" << std::endl;
+        if (tcs.ignored())
+            std::cout << "WARNING: " << tcs.ignored() << " test cases were ignored. "
+                    "(recover-mode selected and test has no auto-dequeue.)" << std::endl;
         _opts.print_flags();
     }
     else
@@ -132,7 +136,7 @@
     {
         std::ostringstream jid;
         jid << std::hex << std::setfill('0');
-        jid << "test_" << std::setw(2) << std::hex << i;
+        jid << "test_" << std::setw(4) << std::hex << i;
         std::ostringstream jdir;
         jdir << "/tmp/" << jid.str();
         jrnl_init_params::shared_ptr jpp(new jrnl_init_params(jid.str(), jdir.str(), jid.str()));




More information about the rhmessaging-commits mailing list