[rhmessaging-commits] rhmessaging commits: r1002 - in store/trunk/cpp: tests/jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Oct 10 22:41:16 EDT 2007


Author: kpvdr
Date: 2007-10-10 22:41:16 -0400 (Wed, 10 Oct 2007)
New Revision: 1002

Modified:
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
Log:
Fixed bug where highest rid was not correctly identified after restore; added check for prepared xid list during resore. This more-or-less completes the transaction code for the journal. There still seems to be some memory leaks around, though, and transaction integration testing is still not complete.

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-11 02:41:16 UTC (rev 1002)
@@ -33,6 +33,7 @@
 
 #include <jrnl/jcntl.hpp>
 
+#include <algorithm>
 #include <cerrno>
 #include <fstream>
 #include <iomanip>
@@ -372,7 +373,7 @@
 
 const bool
 jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
-        const std::vector<std::string>& /*prep_txn_list*/) throw (jexception)
+        const std::vector<std::string>& prep_txn_list) throw (jexception)
 {
     u_int32_t dblks_read = 0;
     bool done = false;
@@ -391,22 +392,23 @@
                     done = er.rcv_decode(h, ifsp, dblks_read);
                     jfile_cycle(fid, ifsp, rd, false);
                 }
+                rd._enq_cnt_list[fid]++;
                 if (er.xid_size())
                 {
 //std::cout << "$" << std::flush;
                     er.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, er.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+                    std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+                            prep_txn_list.end(), xid);
+                    if (cit != prep_txn_list.end())
+                        _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
                     free(xidp);
                 }
                 else
-                {
                     _emap.insert_fid(h._rid, fid);
-                    rd._enq_cnt_list[fid]++;
-                    if (rd._h_rid < h._rid)
-                        rd._h_rid = h._rid;
-                }
+                if (rd._h_rid < h._rid)
+                    rd._h_rid = h._rid;
             }
             break;
         case RHM_JDAT_DEQ_MAGIC:
@@ -427,7 +429,10 @@
                     dr.get_xid(&xidp);
                     assert(xidp != NULL);
                     std::string xid((char*)xidp, dr.xid_size());
-                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+                    std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+                            prep_txn_list.end(), xid);
+                    if (cit != prep_txn_list.end())
+                        _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
                     free(xidp);
                 }
                 else
@@ -442,9 +447,9 @@
                         if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
                             throw e;
                     }
-                    if (rd._h_rid < h._rid)
-                        rd._h_rid = h._rid;
                 }
+                if (rd._h_rid < h._rid)
+                    rd._h_rid = h._rid;
             }
             break;
         case RHM_JDAT_TXA_MAGIC:
@@ -460,15 +465,26 @@
                 ar.get_xid(&xidp);
                 assert(xidp != NULL);
                 std::string xid((char*)xidp, ar.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+                        prep_txn_list.end(), xid);
+                if (cit != prep_txn_list.end())
                 {
-                    try { _emap.unlock(itr->_rid); }
-                    catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
-                    if (itr->_enq_flag)
-                        _wrfc.decr_enqcnt(itr->_fid);
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                    {
+                        try { _emap.unlock(itr->_rid); }
+                        catch(jexception e)
+                        {
+                            if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                                throw e;
+                        }
+                        if (itr->_enq_flag)
+                            _wrfc.decr_enqcnt(itr->_fid);
+                    }
                 }
                 free(xidp);
+                if (rd._h_rid < h._rid)
+                    rd._h_rid = h._rid;
             }
             break;
         case RHM_JDAT_TXC_MAGIC:
@@ -484,18 +500,25 @@
                 cr.get_xid(&xidp);
                 assert(xidp != NULL);
                 std::string xid((char*)xidp, cr.xid_size());
-                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
-                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                std::vector<std::string>::const_iterator cit = std::find(prep_txn_list.begin(),
+                        prep_txn_list.end(), xid);
+                if (cit != prep_txn_list.end())
                 {
-                    if (itr->_enq_flag) // txn enqueue
-                        _emap.insert_fid(itr->_rid, itr->_fid);
-                    else // txn dequeue
+                    txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                    for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
                     {
-                        u_int16_t fid = _emap.get_remove_fid(h._rid);
-                        _wrfc.decr_enqcnt(fid);
+                        if (itr->_enq_flag) // txn enqueue
+                            _emap.insert_fid(itr->_rid, itr->_fid);
+                        else // txn dequeue
+                        {
+                            u_int16_t fid = _emap.get_remove_fid(h._rid);
+                            _wrfc.decr_enqcnt(fid);
+                        }
                     }
                 }
                 free(xidp);
+                if (rd._h_rid < h._rid)
+                    rd._h_rid = h._rid;
             }
             break;
         case RHM_JDAT_EMPTY_MAGIC:
@@ -565,131 +588,6 @@
     return true;
 }
 
-// const bool
-// jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
-//         throw (jexception)
-// {
-//     bool eoj = false;
-//     std::stringstream ss;
-//     ss << _jdir.dirname() << "/" << _base_filename << ".";
-//     ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-// //std::cout << "rcvr_fanalyze: " << ss.str() << ":";
-//     std::ifstream jifs(ss.str().c_str());
-//     if (!jifs.good())
-//         throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
-//     
-//     // 1. Read file header
-//     file_hdr fhdr;
-//     jifs.read((char*)&fhdr, sizeof(fhdr));
-//     if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
-//     {
-//         assert(fhdr._fid == fid);
-//         if (!rd._fro)
-//             rd._fro = fhdr._fro;
-//         std::streamoff foffs = fhdr._fro;
-//         jifs.seekg(foffs);
-//     
-//         // 2. Read file records
-//         while (jifs.good() && !eoj)
-//         {
-//             hdr h;
-//             jifs.read((char*)&h, sizeof(hdr));
-//             switch(h._magic)
-//             {
-//             case RHM_JDAT_ENQ_MAGIC:
-//                 {
-//                     size_t xidsize = 0;
-//                     size_t recsize = 0;
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-//                     jifs.read((char*)&xidsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-//                     jifs.read((char*)&recsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-//                     _emap.insert_fid(h._rid, fid);
-//                     rd._enq_cnt_list[fid]++;
-//                     if (rd._h_rid < h._rid)
-//                         rd._h_rid = h._rid;
-// //std::cout << " e" << h._rid;
-//                     u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
-//                             sizeof(rec_tail));
-//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
-//                     jifs.seekg(foffs);
-//                 }
-//                 break;
-//             case RHM_JDAT_DEQ_MAGIC:
-//                 {
-//                     u_int64_t drid = 0;
-//                     size_t xidsize = 0;
-//                     jifs.read((char*)&drid, sizeof(u_int64_t));
-// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-//                     jifs.read((char*)&xidsize, sizeof(size_t));
-// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-//                     jifs.ignore(sizeof(u_int32_t));
-// #endif
-//                     try
-//                     {
-//                         _emap.get_remove_fid(drid);
-//                         rd._enq_cnt_list[fid]--;
-//                     }
-//                     catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
-//                     if (rd._h_rid < h._rid)
-//                         rd._h_rid = h._rid;
-// //std::cout << " d" << drid << ")";
-//                     u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
-//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
-//                     jifs.seekg(foffs);
-//                 }
-//                 break;
-//             case RHM_JDAT_TXA_MAGIC:
-// //std::cout << " a";
-//                 break;
-//             case RHM_JDAT_TXC_MAGIC:
-// //std::cout << " c";
-//                 break;
-//             case RHM_JDAT_EMPTY_MAGIC:
-//                 {
-// //std::cout << " x";
-//                     u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
-//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
-//                     jifs.seekg(foffs);
-//                 }
-//                 break;
-//             case 0:
-//                 rd._lfid = fid;
-//                 rd._eo = foffs;
-//                 if (!jifs.eof())
-//                     eoj = true;
-// //std::cout << (jifs.eof()?" <eof>":" <end>");
-//                 break;
-//             default:
-//                 std::stringstream ss;
-//                 ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-//                 throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
-//                         "rcvr_fanalyze");
-//             }
-//         }
-//     }
-//     else
-//     {
-//         eoj = true;
-// //std::cout << " <empty>";
-//     }
-//     jifs.close();
-// //std::cout << std::endl;
-//     return eoj;
-// }
-
 void
 jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
 {

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-11 02:41:16 UTC (rev 1002)
@@ -42,10 +42,6 @@
 
         struct rcvdat
         {
-            typedef std::vector<u_int64_t> rid_list;
-            typedef std::pair<rid_list, rid_list> enq_deq_rid_list;
-            typedef std::map<std::string, enq_deq_rid_list> enq_deq_map;
-            
             bool _empty;        ///< Journal data files empty
             u_int16_t _ffid;    ///< First file id
             size_t _fro;        ///< First record offset in ffid
@@ -53,7 +49,6 @@
             size_t _eo;         ///< End offset (first byte past last record)
             u_int64_t _h_rid;   ///< Highest rid found
             std::vector<u_int32_t> _enq_cnt_list; ///< Number enqueued records found for each file
-            enq_deq_map _edm;   ///< Map of enqueue and dequeue rids for each xid
 
             rcvdat():
                     _empty(true),
@@ -62,8 +57,7 @@
                     _lfid(0),
                     _eo(0),
                     _h_rid(0),
-                    _enq_cnt_list(JRNL_NUM_FILES, 0),
-                    _edm()
+                    _enq_cnt_list(JRNL_NUM_FILES, 0)
             {}
             void reset()
             {
@@ -75,8 +69,18 @@
                 _h_rid=0;
                 for (unsigned f=0; f<_enq_cnt_list.size(); f++)
                     _enq_cnt_list[f] = 0;
-                _edm.clear();
             }
+            void print()
+            {
+                std::cout << "_empty=" << (_empty?"T":"F") << std::endl;
+                std::cout << "_ffid=" << _ffid << std::endl;
+                std::cout << "_fro=" << _fro << std::endl;
+                std::cout << "_lfid=" << _lfid << std::endl;
+                std::cout << "_eo=" << _eo << std::endl;
+                std::cout << "_h_rid=" << _h_rid << std::endl;
+                for (unsigned i=0; i<_enq_cnt_list.size(); i++)
+                    std::cout << "_enq_cnt_list[" << i << "]=" << _enq_cnt_list[i] << std::endl;
+            }
         };
 }
 }

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-10 21:36:41 UTC (rev 1001)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-11 02:41:16 UTC (rev 1002)
@@ -54,8 +54,9 @@
     CPPUNIT_TEST(RecoveredReadTest);
     CPPUNIT_TEST(TxnRecoveredReadTest);
     CPPUNIT_TEST(RecoveredDequeueTest);
-//    CPPUNIT_TEST(TxnRecoveredDequeueTest);
+    CPPUNIT_TEST(TxnRecoveredDequeueTest);
     CPPUNIT_TEST(ComplexRecoveryTest1);
+    CPPUNIT_TEST(TxnComplexRecoveryTest1);
     CPPUNIT_TEST(EncodeTest_000);
     CPPUNIT_TEST(EncodeTest_001);
     CPPUNIT_TEST(EncodeTest_002);
@@ -390,6 +391,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 create_xid(xid, 1, XID_SIZE);
+                txn_list.push_back(xid);
                 for (int m=0; m<NUM_MSGS; m++)
                     enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
                 txn_commit(&jc, xid);
@@ -548,6 +550,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 create_xid(xid, 2, XID_SIZE);
+                txn_list.push_back(xid);
                 for (int m=0; m<NUM_MSGS; m++)
                     enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
                 txn_commit(&jc, xid);
@@ -725,6 +728,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 create_xid(xid, 3, XID_SIZE);
+                txn_list.push_back(xid);
                 for (int m=0; m<NUM_MSGS; m++)
                     enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
                 txn_commit(&jc, xid);
@@ -947,6 +951,160 @@
         }
     }
 
+    void TxnComplexRecoveryTest1()
+    {
+        std::vector<std::string> txn_list;
+        //Stack
+        char* test_name = "TxnComplexRecoveryTest1_Stack";
+        try
+        {
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.initialize();
+                // rids: 0 to NUM_MSGS - 1
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
+                // rids: NUM_MSGS to NUM_MSGS*2 - 1
+                create_xid(xid, 4, XID_SIZE);
+                txn_list.push_back(xid);
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+                for (int m=0; m<NUM_MSGS; m++)
+                    deq_msg(&jc, m);
+                // rid: NUM_MSGS*3
+                txn_commit(&jc, xid);
+                jc.flush();
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+            }
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.recover(txn_list);
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jc.recover_complete();
+                // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
+                jc.flush();
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                    deq_msg(&jc, m);
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                    deq_msg(&jc, m);
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+        // Heap
+        test_name = "TxnComplexRecoveryTest1_Heap";
+        rhm::journal::jcntl* jcp = NULL;
+        try
+        {
+            {
+                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->initialize();
+                // rids: 0 to NUM_MSGS*2 - 1
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
+                create_xid(xid, 4, XID_SIZE);
+                // rids: NUM_MSGS to NUM_MSGS*2 - 1
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+                // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
+                for (int m=0; m<NUM_MSGS; m++)
+                    deq_msg(jcp, m);
+                // rid: NUM_MSGS*3
+                txn_commit(jcp, xid);
+                jcp->flush();
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                delete jcp;
+                jcp = NULL;
+            }
+            {
+                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->recover(txn_list);
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jcp->recover_complete();
+                // rids: NUM_MSGS*3+1 to NUM_MSGS*4
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
+                jcp->flush();
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                // rids: NUM_MSGS*4+1 to NUM_MSGS*6
+                for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
+                    deq_msg(jcp, m);
+                for (int m=NUM_MSGS*3+1; m<NUM_MSGS*4+1; m++)
+                    deq_msg(jcp, m);
+                delete jcp;
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            if (jcp)
+                delete jcp;
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+    }
+
     void EncodeTest_000()
     {
         runEncodeTest(0, 0, 0, false, 0, 0, false, 2, "Empty journal");




More information about the rhmessaging-commits mailing list