[rhmessaging-commits] rhmessaging commits: r1001 - in store/trunk/cpp: tests/jrnl and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Oct 10 17:36:41 EDT 2007


Author: kpvdr
Date: 2007-10-10 17:36:41 -0400 (Wed, 10 Oct 2007)
New Revision: 1001

Modified:
   store/trunk/cpp/lib/jrnl/deq_rec.cpp
   store/trunk/cpp/lib/jrnl/deq_rec.hpp
   store/trunk/cpp/lib/jrnl/enq_rec.cpp
   store/trunk/cpp/lib/jrnl/enq_rec.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/jrec.hpp
   store/trunk/cpp/lib/jrnl/rcvdat.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.cpp
   store/trunk/cpp/lib/jrnl/txn_rec.hpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/rtest
Log:
Transactional recover now almost complete, but there is still some testing to be done. Correlation with prepared xid list still missing.

Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -84,7 +84,7 @@
     _buff = NULL;
 }
 
-u_int32_t
+const u_int32_t
 deq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
 {
     assert(wptr != NULL);
@@ -196,7 +196,7 @@
     return size_dblks(wr_cnt);
 }
 
-u_int32_t
+const u_int32_t
 deq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
         throw (jexception)
 {
@@ -318,8 +318,57 @@
     return size_dblks(rd_cnt);
 }
 
+const bool 
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    {
+        // TODO
+    }
+    else // Start at beginning of record
+    {
+        _deq_hdr._hdr.copy(h);
+        ifsp->read((char*)&_deq_hdr._deq_rid, sizeof(u_int64_t));
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_deq_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        if (_deq_hdr._xidsize)
+        {
+            _buff = ::malloc(_deq_hdr._xidsize);
+            if (_buff == NULL)
+            {
+                std::stringstream ss;
+                ss << "_buff malloc(): errno=" << errno;
+                throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+            }
+            // Decode xid
+            ifsp->read((char*)_buff, _deq_hdr._xidsize);
+            if ((size_t)ifsp->gcount() == _deq_hdr._xidsize)
+            {
+                ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
+                        _deq_hdr._xidsize);
+                return true;
+            }
+            else
+                ; // TODO
+        }
+        else
+        {
+            // Igore rest of record
+            rec_offs_dblks = rec_size_dblks();
+            ifsp->ignore(rec_offs_dblks * JRNL_DBLK_SIZE - sizeof(_deq_hdr));
+            return true;
+        }
+    }
+    return false;
+}
+
 const size_t
-deq_rec::get_xid(const void** const xidpp)
+deq_rec::get_xid(void** const xidpp)
 {
     if (!_buff)
     {

Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -73,11 +73,15 @@
         // Prepare instance for use in writing data to journal
         void reset(const  u_int64_t rid, const  u_int64_t drid, const void* const xidp,
                 const size_t xidlen);
-        u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
                 throw (jexception);
-        u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+                u_int32_t max_size_dblks) throw (jexception);
+        // Decode used for recover
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
                 throw (jexception);
-        const size_t get_xid(const void** const xidpp);
+        inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
+        const size_t get_xid(void** const xidpp);
         std::string& str(std::string& str) const;
         inline const size_t data_size() const { return 0; } // This record never carries data
         const size_t xid_size() const;

Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -98,7 +98,7 @@
     _enq_tail._rid = rid;
 }
 
-u_int32_t
+const u_int32_t
 enq_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
 {
     assert(wptr != NULL);
@@ -240,7 +240,7 @@
     return size_dblks(wr_cnt);
 }
 
-u_int32_t
+const u_int32_t
 enq_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
         throw (jexception)
 {
@@ -427,6 +427,60 @@
     return size_dblks(rd_cnt);
 }
 
+const bool
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    {
+        // TODO
+    }
+    else // Start at beginning of record
+    {
+        _enq_hdr._hdr.copy(h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_enq_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler1
+#endif
+        ifsp->read((char*)&_enq_hdr._dsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler1
+#endif
+        if (_enq_hdr._xidsize)
+        {
+            _buff = ::malloc(_enq_hdr._xidsize);
+            if (_buff == NULL)
+            {
+                std::stringstream ss;
+                ss << "_buff malloc(): errno=" << errno;
+                throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+            }
+            // Decode xid
+            ifsp->read((char*)_buff, _enq_hdr._xidsize);
+            if ((size_t)ifsp->gcount() == _enq_hdr._xidsize)
+            {
+                ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
+                        _enq_hdr._xidsize);
+                return true;
+            }
+            else
+                ; // TODO
+        }
+        else
+        {
+            // Igore rest of record
+            ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr));
+            return true;
+        }
+    }
+    return false;
+}
+
 const size_t
 enq_rec::get_xid(void** const xidpp)
 {

Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -84,9 +84,12 @@
         void reset(const u_int64_t rid, const void* const dbuf, const size_t dlen,
                 const void* const xidp, const size_t xidlen, bool transient);
 
-        u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
                 throw (jexception);
-        u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+                u_int32_t max_size_dblks) throw (jexception);
+        // Decode used for recover
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
                 throw (jexception);
         const size_t get_xid(void** const xidpp);
         const size_t get_data(void** const datapp);

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -361,143 +361,335 @@
             throw e;
     }
 
-    // Restore all read and write pointers
+    // Restore all read and write pointers and transactions
     if (!rd._empty)
     {
-        bool eoj = false;
-        for (u_int16_t fnum=0; fnum<JRNL_NUM_FILES && !eoj; fnum++)
-        {
-            u_int16_t fid = (fnum + rd._ffid) % JRNL_NUM_FILES;
-            eoj = rcvr_fanalyze(fid, rd, prep_txn_list);
-        }
+        u_int16_t fid = rd._ffid;
+        std::ifstream ifs;
+        while (rcvr_get_next_record(fid, &ifs, rd, prep_txn_list));
     }
 }
 
 const bool
-jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
-        throw (jexception)
+jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+        const std::vector<std::string>& /*prep_txn_list*/) throw (jexception)
 {
-    bool eoj = false;
-    std::stringstream ss;
-    ss << _jdir.dirname() << "/" << _base_filename << ".";
-    ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
-//std::cout << "rcvr_fanalyze: " << ss.str() << ":";
-    std::ifstream jifs(ss.str().c_str());
-    if (!jifs.good())
-        throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
-    
-    // 1. Read file header
-    file_hdr fhdr;
-    jifs.read((char*)&fhdr, sizeof(fhdr));
-    if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+    u_int32_t dblks_read = 0;
+    bool done = false;
+    void* xidp = NULL;
+    hdr h;
+    jfile_cycle(fid, ifsp, rd, true);
+    ifsp->read((char*)&h, sizeof(hdr));
+    switch(h._magic)
     {
-        assert(fhdr._fid == fid);
-        if (!rd._fro)
-            rd._fro = fhdr._fro;
-        std::streamoff foffs = fhdr._fro;
-        jifs.seekg(foffs);
-    
-        // 2. Read file records
-        while (jifs.good() && !eoj)
-        {
-            hdr h;
-            jifs.read((char*)&h, sizeof(hdr));
-            switch(h._magic)
+        case RHM_JDAT_ENQ_MAGIC:
             {
-            case RHM_JDAT_ENQ_MAGIC:
+//std::cout << " e" << h._rid << std::flush;
+                enq_rec er;
+                while (!done)
                 {
-                    size_t xidsize = 0;
-                    size_t recsize = 0;
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
-                    jifs.read((char*)&xidsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
-                    jifs.read((char*)&recsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
+                    done = er.rcv_decode(h, ifsp, dblks_read);
+                    jfile_cycle(fid, ifsp, rd, false);
+                }
+                if (er.xid_size())
+                {
+//std::cout << "$" << std::flush;
+                    er.get_xid(&xidp);
+                    assert(xidp != NULL);
+                    std::string xid((char*)xidp, er.xid_size());
+                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, true));
+                    free(xidp);
+                }
+                else
+                {
                     _emap.insert_fid(h._rid, fid);
                     rd._enq_cnt_list[fid]++;
                     if (rd._h_rid < h._rid)
                         rd._h_rid = h._rid;
-//std::cout << " e" << h._rid;
-                    u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
-                            sizeof(rec_tail));
-                    foffs += rec_dblks * JRNL_DBLK_SIZE;
-                    jifs.seekg(foffs);
                 }
-                break;
-            case RHM_JDAT_DEQ_MAGIC:
+            }
+            break;
+        case RHM_JDAT_DEQ_MAGIC:
+            {
+//std::cout << " d" << h._rid << std::flush;
+                deq_rec dr;
+                while (!done)
                 {
-                    u_int64_t drid = 0;
-                    size_t xidsize = 0;
-                    jifs.read((char*)&drid, sizeof(u_int64_t));
-#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
-                    jifs.read((char*)&xidsize, sizeof(size_t));
-#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
-                    jifs.ignore(sizeof(u_int32_t));
-#endif
+                    done = dr.rcv_decode(h, ifsp, dblks_read);
+                    jfile_cycle(fid, ifsp, rd, false);
+                }
+                if (dr.xid_size())
+                {
+//std::cout << "$" << std::flush;
+                    // If the enqueue is part of a pending txn, it will not yet be in emap
+                    try { _emap.lock(h._rid); }
+                    catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+                    dr.get_xid(&xidp);
+                    assert(xidp != NULL);
+                    std::string xid((char*)xidp, dr.xid_size());
+                    _tmap.insert_txn_data(xid, txn_data(h._rid, fid, false));
+                    free(xidp);
+                }
+                else
+                {
                     try
                     {
-                        _emap.get_remove_fid(drid);
+                        _emap.get_remove_fid(dr.deq_rid());
                         rd._enq_cnt_list[fid]--;
                     }
-                    catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
+                    catch (jexception& e)
+                    {
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                            throw e;
+                    }
                     if (rd._h_rid < h._rid)
                         rd._h_rid = h._rid;
-//std::cout << " d" << drid << ")";
-                    u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
-                    foffs += rec_dblks * JRNL_DBLK_SIZE;
-                    jifs.seekg(foffs);
                 }
-                break;
-            case RHM_JDAT_TXA_MAGIC:
-//std::cout << " a";
-                break;
-            case RHM_JDAT_TXC_MAGIC:
-//std::cout << " c";
-                break;
-            case RHM_JDAT_EMPTY_MAGIC:
+            }
+            break;
+        case RHM_JDAT_TXA_MAGIC:
+            {
+//std::cout << " a" << h._rid << std::flush;
+                txn_rec ar;
+                while (!done)
                 {
-//std::cout << " x";
-                    u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
-                    foffs += rec_dblks * JRNL_DBLK_SIZE;
-                    jifs.seekg(foffs);
+                    done = ar.rcv_decode(h, ifsp, dblks_read);
+                    jfile_cycle(fid, ifsp, rd, false);
                 }
-                break;
-            case 0:
-                rd._lfid = fid;
-                rd._eo = foffs;
-                if (!jifs.eof())
-                    eoj = true;
-//std::cout << (jifs.eof()?" <eof>":" <end>");
-                break;
-            default:
-                std::stringstream ss;
-                ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
-                throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
-                        "rcvr_fanalyze");
+                // Delete this txn from tmap, unlock any locked records in emap
+                ar.get_xid(&xidp);
+                assert(xidp != NULL);
+                std::string xid((char*)xidp, ar.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                {
+                    try { _emap.unlock(itr->_rid); }
+                    catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+                    if (itr->_enq_flag)
+                        _wrfc.decr_enqcnt(itr->_fid);
+                }
+                free(xidp);
             }
+            break;
+        case RHM_JDAT_TXC_MAGIC:
+            {
+//std::cout << " c" << h._rid << std::flush;
+                txn_rec cr;
+                while (!done)
+                {
+                    done = cr.rcv_decode(h, ifsp, dblks_read);
+                    jfile_cycle(fid, ifsp, rd, false);
+                }
+                // Delete this txn from tmap, process records into emap
+                cr.get_xid(&xidp);
+                assert(xidp != NULL);
+                std::string xid((char*)xidp, cr.xid_size());
+                txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+                for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+                {
+                    if (itr->_enq_flag) // txn enqueue
+                        _emap.insert_fid(itr->_rid, itr->_fid);
+                    else // txn dequeue
+                    {
+                        u_int16_t fid = _emap.get_remove_fid(h._rid);
+                        _wrfc.decr_enqcnt(fid);
+                    }
+                }
+                free(xidp);
+            }
+            break;
+        case RHM_JDAT_EMPTY_MAGIC:
+            {
+//std::cout << " x" << std::flush;
+                u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+                ifsp->ignore(rec_dblks * JRNL_DBLK_SIZE - sizeof(hdr));
+            }
+        break;
+        case 0:
+//std::cout << " z" << std::flush;
+            rd._lfid = fid;
+            rd._eo = ifsp->tellg();
+            return false;
+        default:
+//std::cout << " ?" << std::flush;
+            std::stringstream ss;
+            ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+            throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
+                    "rcvr_get_next_record");
+    }
+
+    return true;
+}
+
+const bool
+jcntl::jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd, const bool jump_fro)
+{
+    if (ifsp->is_open())
+    {
+        if (ifsp->eof() || !ifsp->good())
+        {
+            ifsp->close();
+            fid++;
+            if (fid >= JRNL_NUM_FILES)
+                fid = 0;
+            if (fid == rd._ffid) // used up all journal files
+                return false;
         }
     }
-    else
+    if (!ifsp->is_open())
     {
-        eoj = true;
-//std::cout << " <empty>";
+        std::stringstream ss;
+        ss << _jdir.dirname() << "/" << _base_filename << ".";
+        ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+        ifsp->open(ss.str().c_str());
+        if (!ifsp->good())
+            throw jexception(jerrno::JERR__FILEIO, ss.str(), "jcntl", "rcvr_get_next_record");
+
+        // Read file header
+        file_hdr fhdr;
+        ifsp->read((char*)&fhdr, sizeof(fhdr));
+        if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+        {
+            assert(fhdr._fid == fid);
+            if (!rd._fro)
+                rd._fro = fhdr._fro;
+            std::streamoff foffs = jump_fro ? fhdr._fro : JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
+            ifsp->seekg(foffs);
+        }
+        else
+        {
+            ifsp->close();
+            return false;
+        }
     }
-    jifs.close();
-//std::cout << std::endl;
-    return eoj;
+    return true;
 }
 
+// const bool
+// jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
+//         throw (jexception)
+// {
+//     bool eoj = false;
+//     std::stringstream ss;
+//     ss << _jdir.dirname() << "/" << _base_filename << ".";
+//     ss << std::setfill('0') << std::setw(4) << fid << "." << JRNL_DATA_EXTENSION;
+// //std::cout << "rcvr_fanalyze: " << ss.str() << ":";
+//     std::ifstream jifs(ss.str().c_str());
+//     if (!jifs.good())
+//         throw jexception(jerrno::JERR__FILEIO, ss.str(), "jinf", "analyze");
+//     
+//     // 1. Read file header
+//     file_hdr fhdr;
+//     jifs.read((char*)&fhdr, sizeof(fhdr));
+//     if (fhdr._hdr._magic == RHM_JDAT_FILE_MAGIC)
+//     {
+//         assert(fhdr._fid == fid);
+//         if (!rd._fro)
+//             rd._fro = fhdr._fro;
+//         std::streamoff foffs = fhdr._fro;
+//         jifs.seekg(foffs);
+//     
+//         // 2. Read file records
+//         while (jifs.good() && !eoj)
+//         {
+//             hdr h;
+//             jifs.read((char*)&h, sizeof(hdr));
+//             switch(h._magic)
+//             {
+//             case RHM_JDAT_ENQ_MAGIC:
+//                 {
+//                     size_t xidsize = 0;
+//                     size_t recsize = 0;
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+//                     jifs.read((char*)&xidsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+//                     jifs.read((char*)&recsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+//                     _emap.insert_fid(h._rid, fid);
+//                     rd._enq_cnt_list[fid]++;
+//                     if (rd._h_rid < h._rid)
+//                         rd._h_rid = h._rid;
+// //std::cout << " e" << h._rid;
+//                     u_int32_t rec_dblks = jrec::size_dblks((size_t)recsize + sizeof(enq_hdr) +
+//                             sizeof(rec_tail));
+//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
+//                     jifs.seekg(foffs);
+//                 }
+//                 break;
+//             case RHM_JDAT_DEQ_MAGIC:
+//                 {
+//                     u_int64_t drid = 0;
+//                     size_t xidsize = 0;
+//                     jifs.read((char*)&drid, sizeof(u_int64_t));
+// #if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+//                     jifs.read((char*)&xidsize, sizeof(size_t));
+// #if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+//                     jifs.ignore(sizeof(u_int32_t));
+// #endif
+//                     try
+//                     {
+//                         _emap.get_remove_fid(drid);
+//                         rd._enq_cnt_list[fid]--;
+//                     }
+//                     catch (jexception& e) {} // ignore JERR_EMAP_NOTFOUND thrown here
+//                     if (rd._h_rid < h._rid)
+//                         rd._h_rid = h._rid;
+// //std::cout << " d" << drid << ")";
+//                     u_int32_t rec_dblks = jrec::size_dblks(sizeof(deq_hdr));
+//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
+//                     jifs.seekg(foffs);
+//                 }
+//                 break;
+//             case RHM_JDAT_TXA_MAGIC:
+// //std::cout << " a";
+//                 break;
+//             case RHM_JDAT_TXC_MAGIC:
+// //std::cout << " c";
+//                 break;
+//             case RHM_JDAT_EMPTY_MAGIC:
+//                 {
+// //std::cout << " x";
+//                     u_int32_t rec_dblks = jrec::size_dblks(sizeof(hdr));
+//                     foffs += rec_dblks * JRNL_DBLK_SIZE;
+//                     jifs.seekg(foffs);
+//                 }
+//                 break;
+//             case 0:
+//                 rd._lfid = fid;
+//                 rd._eo = foffs;
+//                 if (!jifs.eof())
+//                     eoj = true;
+// //std::cout << (jifs.eof()?" <eof>":" <end>");
+//                 break;
+//             default:
+//                 std::stringstream ss;
+//                 ss << std::hex << std::setfill('0') << "Magic=0x" << std::setw(8) << h._magic;
+//                 throw jexception(jerrno::JERR_JCNTL_UNKNOWNMAGIC, ss.str(), "jcntl",
+//                         "rcvr_fanalyze");
+//             }
+//         }
+//     }
+//     else
+//     {
+//         eoj = true;
+// //std::cout << " <empty>";
+//     }
+//     jifs.close();
+// //std::cout << std::endl;
+//     return eoj;
+// }
+
 void
 jcntl::aio_wr_callback(jcntl*  journal, u_int32_t num_dtoks)
 {

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -611,16 +611,21 @@
         /**
         * \brief Analyze journal for recovery.
         */
-        void rcvr_janalyze(rcvdat& jrs, const std::vector<std::string>& prep_txn_list)
+        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list)
                 throw (jexception);
 
+        const bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+                const std::vector<std::string>& prep_txn_list) throw (jexception);
+
+        const bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
+                const bool jump_fro);
         /**
         * \brief Analyze a particular journal file for recovery.
         *
         * \return <b><i>true</i></b> if end of journal (eoj) found; <b><i>false</i></b> otherwise.
         */
-        const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
-                const std::vector<std::string>& prep_txn_list) throw (jexception);
+//         const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
+//                 const std::vector<std::string>& prep_txn_list) throw (jexception);
 
 	/**
 	* Intenal callback write

Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -41,6 +41,7 @@
 }
 }
 
+#include <fstream>
 #include <string>
 #include <sys/types.h>
 #include <jrnl/file_hdr.hpp>
@@ -113,8 +114,8 @@
         * \param max_size_dblks Maximum number of data-blocks to write to pointer wptr.
         * \returns Number of data-blocks encoded.
         */
-        virtual u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
-                throw (jexception) = 0;
+        virtual const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks,
+                u_int32_t max_size_dblks) throw (jexception) = 0;
 
         /**
         * \brief Decode into this instance of jrec from the read buffer at the disk-block-aligned
@@ -146,9 +147,12 @@
         * \param max_size_dblks Maximum number of data-blocks to read from pointer rptr.
         * \returns Number of data-blocks read (consumed).
         */
-        virtual u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+        virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
                 u_int32_t max_size_dblks) throw (jexception) = 0;
 
+        virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
+                throw (jexception) = 0;
+
         virtual std::string& str(std::string& str) const = 0;
         virtual const size_t data_size() const = 0;
         virtual const size_t xid_size() const = 0;

Modified: store/trunk/cpp/lib/jrnl/rcvdat.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/rcvdat.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -62,7 +62,7 @@
                     _lfid(0),
                     _eo(0),
                     _h_rid(0),
-                    _enq_cnt_list(JRNL_NUM_FILES),
+                    _enq_cnt_list(JRNL_NUM_FILES, 0),
                     _edm()
             {}
             void reset()
@@ -73,7 +73,8 @@
                 _lfid=0;
                 _eo=0;
                 _h_rid=0;
-                _enq_cnt_list.clear();
+                for (unsigned f=0; f<_enq_cnt_list.size(); f++)
+                    _enq_cnt_list[f] = 0;
                 _edm.clear();
             }
         };

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -250,7 +250,7 @@
     while (true)
     {
 //std::string s;
-//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " f=" << (_rrfc.empty()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
+//std::cout << " [f pi=" << _pg_index << " d=" << dblks_rem() << " c=" << (_rrfc.is_compl()?"T":"F") << " o=" << (_rrfc.is_wr_aio_outstanding()?"T":"F") << " status:" << _rrfc.file_handle()->status_str(s) << "]" << std::flush;
         if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
         {
             aio_cycle();   // check if any AIOs have returned

Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -88,7 +88,7 @@
     _txn_tail._rid = rid;
 }
 
-u_int32_t
+const u_int32_t
 txn_rec::encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks) throw (jexception)
 {
     assert(wptr != NULL);
@@ -196,7 +196,7 @@
     return size_dblks(wr_cnt);
 }
 
-u_int32_t
+const u_int32_t
 txn_rec::decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
         throw (jexception)
 {
@@ -313,8 +313,45 @@
     return size_dblks(rd_cnt);
 }
 
+const bool 
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+{
+    if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+    {
+        // TODO
+    }
+    else // Start at beginning of record
+    {
+        _txn_hdr._hdr.copy(h);
+#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        ifsp->read((char*)&_txn_hdr._xidsize, sizeof(size_t));
+#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
+        ifsp->ignore(sizeof(u_int32_t)); // _filler0
+#endif
+        _buff = ::malloc(_txn_hdr._xidsize);
+        if (_buff == NULL)
+        {
+            std::stringstream ss;
+            ss << "_buff malloc(): errno=" << errno;
+            throw jexception(jerrno::JERR__MALLOC, ss.str(), "deq_rec", "decode");
+        }
+        // Decode xid
+        ifsp->read((char*)_buff, _txn_hdr._xidsize);
+        if ((size_t)ifsp->gcount() == _txn_hdr._xidsize)
+        {
+            ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+            return true;
+        }
+        else
+            ; // TODO
+    }
+    return false;
+}
+
 const size_t
-txn_rec::get_xid(const void** const xidpp)
+txn_rec::get_xid(void** const xidpp)
 {
     if (!_buff)
     {

Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -73,11 +73,14 @@
         // Prepare instance for use in writing data to journal
         void reset(const u_int32_t magic, const  u_int64_t rid, const void* const xidp,
                 const size_t xidlen);
-        u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t encode(void* wptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
                 throw (jexception);
-        u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks, u_int32_t max_size_dblks)
+        const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
+                u_int32_t max_size_dblks) throw (jexception);
+        // Decode used for recover
+        const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
                 throw (jexception);
-        const size_t get_xid(const void** const xidpp);
+        const size_t get_xid(void** const xidpp);
         std::string& str(std::string& str) const;
         inline const size_t data_size() const { return 0; } // This record never carries data
         const size_t xid_size() const;

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-10 21:36:41 UTC (rev 1001)
@@ -38,6 +38,8 @@
 #define NUM_MSGS 5
 #define MAX_AIO_SLEEPS 500
 #define AIO_SLEEP_TIME 1000
+#define MSG_SIZE 100
+#define XID_SIZE 64
 
 class JournalSystemTests : public CppUnit::TestCase  
 {
@@ -46,9 +48,13 @@
     CPPUNIT_TEST(InitializationTest);
     CPPUNIT_TEST(EmptyRecoverTest);
     CPPUNIT_TEST(EnqueueTest);
+    CPPUNIT_TEST(TxnEnqueueTest);
     CPPUNIT_TEST(RecoverReadTest);
+    CPPUNIT_TEST(TxnRecoverReadTest);
     CPPUNIT_TEST(RecoveredReadTest);
+    CPPUNIT_TEST(TxnRecoveredReadTest);
     CPPUNIT_TEST(RecoveredDequeueTest);
+//    CPPUNIT_TEST(TxnRecoveredDequeueTest);
     CPPUNIT_TEST(ComplexRecoveryTest1);
     CPPUNIT_TEST(EncodeTest_000);
     CPPUNIT_TEST(EncodeTest_001);
@@ -83,6 +89,7 @@
 
     jtest t;
     std::string msg;
+    std::string xid;
     void* mbuff;
     size_t msize;
     void* xidbuff;
@@ -230,7 +237,7 @@
             rhm::journal::jcntl jc(test_name, "jdata", test_name);
             jc.initialize();
             for (int m=0; m<NUM_MSGS; m++)
-                enq_msg(&jc, create_msg(msg, m));
+                enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
         }
         catch (rhm::journal::jexception& e)
         {
@@ -247,7 +254,7 @@
             CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
             jcp->initialize();
             for (int m=0; m<NUM_MSGS; m++)
-                enq_msg(jcp, create_msg(msg, m));
+                enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
             delete jcp;
         }
         catch (rhm::journal::jexception& e)
@@ -260,6 +267,48 @@
         }
     }
 
+    void TxnEnqueueTest()
+    {
+        //Stack
+        char* test_name = "TxnEnqueueTest_Stack";
+        try
+        {
+            rhm::journal::jcntl jc(test_name, "jdata", test_name);
+            jc.initialize();
+            create_xid(xid, 0, XID_SIZE);
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+            txn_commit(&jc, xid);
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+        // Heap
+        test_name = "TxnEnqueueTest_Heap";
+        rhm::journal::jcntl* jcp = NULL;
+        try
+        {
+            jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+            CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+            jcp->initialize();
+            for (int m=0; m<NUM_MSGS; m++)
+                enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+            txn_commit(jcp, xid);
+            delete jcp;
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            if (jcp)
+                delete jcp;
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+    }
+
     void RecoverReadTest()
     {
         std::vector<std::string> txn_list;
@@ -271,7 +320,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m));
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -280,7 +329,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
             }
@@ -301,7 +350,7 @@
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
                 jcp->initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m));
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
                 delete jcp;
                 jcp = NULL;
             }
@@ -314,7 +363,7 @@
                     read_msg(jcp);
                     std::string msg((char*)mbuff, msize);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 delete jcp;
@@ -330,6 +379,79 @@
         }
     }
 
+    void TxnRecoverReadTest()
+    {
+        std::vector<std::string> txn_list;
+        //Stack
+        char* test_name = "TxnRecoverReadTest_Stack";
+        try
+        {
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.initialize();
+                create_xid(xid, 1, XID_SIZE);
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(&jc, xid);
+            }
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+        // Heap
+        test_name = "TxnRecoverReadTest_Heap";
+        rhm::journal::jcntl* jcp = NULL;
+        try
+        {
+            {
+                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->initialize();
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(jcp, xid);
+                delete jcp;
+                jcp = NULL;
+            }
+            {
+                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(jcp);
+                    std::string msg((char*)mbuff, msize);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                delete jcp;
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            if (jcp)
+                delete jcp;
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+    }
+
     void RecoveredReadTest()
     {
         std::vector<std::string> txn_list;
@@ -341,7 +463,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m));
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -350,7 +472,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jc.recover_complete();
@@ -358,7 +480,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
             }
@@ -379,7 +501,7 @@
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
                 jcp->initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m));
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
                 delete jcp;
                 jcp = NULL;
             }
@@ -391,7 +513,7 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jcp->recover_complete();
@@ -399,7 +521,7 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 delete jcp;
@@ -415,6 +537,94 @@
         }
     }
 
+    void TxnRecoveredReadTest()
+    {
+        std::vector<std::string> txn_list;
+        //Stack
+        char* test_name = "TxnRecoveredReadTest_Stack";
+        try
+        {
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.initialize();
+                create_xid(xid, 2, XID_SIZE);
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(&jc, xid);
+            }
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jc.recover_complete();
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+        // Heap
+        test_name = "TxnRecoveredReadTest_Heap";
+        rhm::journal::jcntl* jcp = NULL;
+        try
+        {
+            {
+                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->initialize();
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(jcp, xid);
+                delete jcp;
+                jcp = NULL;
+            }
+            {
+                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jcp->recover_complete();
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                delete jcp;
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            if (jcp)
+                delete jcp;
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+    }
+
     void RecoveredDequeueTest()
     {
         std::vector<std::string> txn_list;
@@ -426,7 +636,7 @@
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
                 jc.initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(&jc, create_msg(msg, m));
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
             }
             {
                 rhm::journal::jcntl jc(test_name, "jdata", test_name);
@@ -435,7 +645,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jc.recover_complete();
@@ -443,7 +653,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 for (int m=0; m<NUM_MSGS; m++)
@@ -466,7 +676,7 @@
                 CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
                 jcp->initialize();
                 for (int m=0; m<NUM_MSGS; m++)
-                    enq_msg(jcp, create_msg(msg, m));
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
                 delete jcp;
                 jcp = NULL;
             }
@@ -478,7 +688,7 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jcp->recover_complete();
@@ -486,7 +696,7 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 for (int m=0; m<NUM_MSGS; m++)
@@ -504,6 +714,98 @@
         }
     }
 
+    void TxnRecoveredDequeueTest()
+    {
+        std::vector<std::string> txn_list;
+        //Stack
+        char* test_name = "TxnRecoveredDequeueTest_Stack";
+        try
+        {
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.initialize();
+                create_xid(xid, 3, XID_SIZE);
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(&jc, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(&jc, xid);
+            }
+            {
+                rhm::journal::jcntl jc(test_name, "jdata", test_name);
+                jc.recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jc.recover_complete();
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(&jc);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                for (int m=0; m<NUM_MSGS; m++)
+                    deq_msg(&jc, m);
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+        // Heap
+        test_name = "TxnRecoveredDequeueTest_Heap";
+        rhm::journal::jcntl* jcp = NULL;
+        try
+        {
+            {
+                jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->initialize();
+                for (int m=0; m<NUM_MSGS; m++)
+                    enq_txn_msg(jcp, create_msg(msg, m, MSG_SIZE), xid);
+                txn_commit(jcp, xid);
+                delete jcp;
+                jcp = NULL;
+            }
+            {
+                rhm::journal::jcntl* jcp = new rhm::journal::jcntl(test_name, "jdata", test_name);
+                CPPUNIT_ASSERT_MESSAGE("Journal heap instantiation failed.", jcp != NULL);
+                jcp->recover(txn_list);
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                jcp->recover_complete();
+                for (int m=0; m<NUM_MSGS; m++)
+                {
+                    read_msg(jcp);
+                    CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
+                    cleanMessage();
+                }
+                for (int m=0; m<NUM_MSGS; m++)
+                    deq_msg(jcp, m);
+                delete jcp;
+            }
+        }
+        catch (rhm::journal::jexception& e)
+        {
+            if (jcp)
+                delete jcp;
+            std::stringstream ss;
+            ss << e;
+            CPPUNIT_FAIL(ss.str());
+        }
+    }
+
     void ComplexRecoveryTest1()
     {
         std::vector<std::string> txn_list;
@@ -516,7 +818,7 @@
                 jc.initialize();
                 // rids: 0 to NUM_MSGS*2 - 1
                 for (int m=0; m<NUM_MSGS*2; m++)
-                    enq_msg(&jc, create_msg(msg, m));
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
                 // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
                 for (int m=0; m<NUM_MSGS; m++)
                     deq_msg(&jc, m);
@@ -525,7 +827,7 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
             }
@@ -536,26 +838,26 @@
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jc.recover_complete();
                 // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
                 for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    enq_msg(&jc, create_msg(msg, m));
+                    enq_msg(&jc, create_msg(msg, m, MSG_SIZE));
                 jc.flush();
                 for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
                 {
                     read_msg(&jc);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
@@ -582,7 +884,7 @@
                 jcp->initialize();
                 // rids: 0 to NUM_MSGS*2 - 1
                 for (int m=0; m<NUM_MSGS*2; m++)
-                    enq_msg(jcp, create_msg(msg, m));
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
                 // rids: NUM_MSGS*2 to NUM_MSGS*3 - 1
                 for (int m=0; m<NUM_MSGS; m++)
                     deq_msg(jcp, m);
@@ -591,7 +893,7 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt before recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 delete jcp;
@@ -605,26 +907,26 @@
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt during recover.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 jcp->recover_complete();
                 // rids: NUM_MSGS*3 to NUM_MSGS*4 - 1
                 for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
-                    enq_msg(jcp, create_msg(msg, m));
+                    enq_msg(jcp, create_msg(msg, m, MSG_SIZE));
                 jcp->flush();
                 for (int m=NUM_MSGS; m<NUM_MSGS*2; m++)
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 for (int m=NUM_MSGS*3; m<NUM_MSGS*4; m++)
                 {
                     read_msg(jcp);
                     CPPUNIT_ASSERT_MESSAGE("Message corrupt after recovery.",
-                            create_msg(msg, m).compare(std::string((char*)mbuff, msize)) == 0);
+                            create_msg(msg, m, MSG_SIZE).compare(std::string((char*)mbuff, msize)) == 0);
                     cleanMessage();
                 }
                 // rids: NUM_MSGS*4 to NUM_MSGS*6 - 1
@@ -809,6 +1111,16 @@
                 dtp, false), jc, aio_sleep_cnt, dtp));
     }
 
+    void enq_txn_msg(rhm::journal::jcntl* jc, const std::string msg, const std::string xid)
+    {
+        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+        unsigned aio_sleep_cnt = 0;
+        while (handle_jcntl_response(jc->enqueue_txn_data_record(msg.c_str(), msg.size(), msg.size(),
+                dtp, xid, false), jc, aio_sleep_cnt, dtp));
+    }
+
     void deq_msg(rhm::journal::jcntl* jc, u_int64_t rid)
     {
         rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
@@ -820,6 +1132,36 @@
         while (handle_jcntl_response(jc->dequeue_data_record(dtp), jc, aio_sleep_cnt, dtp));
     }
 
+    void deq_txn_msg(rhm::journal::jcntl* jc, u_int64_t rid, const std::string xid)
+    {
+        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+        dtp->set_wstate(rhm::journal::data_tok::ENQ);
+        dtp->set_rid(rid);
+
+        unsigned aio_sleep_cnt = 0;
+        while (handle_jcntl_response(jc->dequeue_txn_data_record(dtp, xid),
+                jc, aio_sleep_cnt, dtp));
+    }
+    
+    void txn_abort(rhm::journal::jcntl* jc, const std::string xid)
+    {
+        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+        unsigned aio_sleep_cnt = 0;
+        while (handle_jcntl_response(jc->txn_abort(dtp, xid), jc, aio_sleep_cnt, dtp));
+    }
+    
+    void txn_commit(rhm::journal::jcntl* jc, const std::string xid)
+    {
+        rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
+        CPPUNIT_ASSERT_MESSAGE("Data Token heap intantiation failed.", dtp != NULL);
+
+        unsigned aio_sleep_cnt = 0;
+        while (handle_jcntl_response(jc->txn_commit(dtp, xid), jc, aio_sleep_cnt, dtp));
+    }
+
     char* read_msg(rhm::journal::jcntl* jc)
     {
         rhm::journal::data_tok* dtp = new rhm::journal::data_tok;
@@ -849,34 +1191,47 @@
                 else
                 {
                     delete dtp;
-                    CPPUNIT_FAIL("dequeue_data(): timeout on RHM_IORES_AIO_WAIT.");
+                    CPPUNIT_FAIL("Timeout on RHM_IORES_AIO_WAIT.");
                 }
                 break;
             case rhm::journal::RHM_IORES_EMPTY:
                 delete dtp;
-                CPPUNIT_FAIL("dequeue_data(): RHM_IORES_EMPTY.");
+                CPPUNIT_FAIL("RHM_IORES_EMPTY");
             case rhm::journal::RHM_IORES_FULL:
                 delete dtp;
-                CPPUNIT_FAIL("dequeue_data(): RHM_IORES_FULL.");
+                CPPUNIT_FAIL("RHM_IORES_FULL");
             case rhm::journal::RHM_IORES_BUSY:
                 delete dtp;
-                CPPUNIT_FAIL("dequeue_data(): RHM_IORES_BUSY.");
+                CPPUNIT_FAIL("RHM_IORES_BUSY");
+            case rhm::journal::RHM_IORES_TXPENDING:
+                delete dtp;
+                CPPUNIT_FAIL("RHM_IORES_TXPENDING");
             default:
                 delete dtp;
-                CPPUNIT_FAIL("dequeue_data(): unknown return value.");
+                CPPUNIT_FAIL("unknown return value");
         }
         return true;
     }
     
-    static std::string& create_msg(std::string& s, int msg_num)
+    static std::string& create_msg(std::string& s, int msg_num, int len)
     {
         std::stringstream ss;
-        ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num;
-        ss << "_4567890123456789012345678901234567890";
-        ss << "12345678901234567890123456789012345678901234567890"; // 100 chars long (2 dblks)
+        ss << "Message_" << std::setfill('0') << std::setw(4) << msg_num << "_";
+        for (int i=14; i<=len; i++)
+            ss << (char)('0' + i%10);
         s.assign(ss.str());
         return s;
     }
+    
+    static std::string& create_xid(std::string& s, int msg_num, int len)
+    {
+        std::stringstream ss;
+        ss << "XID_" << std::setfill('0') << std::setw(4) << msg_num << "_";
+        for (int i=9; i<len; i++)
+            ss << (char)('a' + i%26);
+        s.assign(ss.str());
+        return s;
+    }
 
     void runEncodeTest(const unsigned num_msgs, const unsigned min_msg_size,
             const unsigned max_msg_szie, const bool auto_deq, const unsigned min_xid_size,

Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest	2007-10-10 18:39:30 UTC (rev 1000)
+++ store/trunk/cpp/tests/jrnl/rtest	2007-10-10 21:36:41 UTC (rev 1001)
@@ -30,8 +30,8 @@
 
 NUM_JFILES=8
 VG_ITERATIONS=1
-#VG_NORM_FILESIZE=11
-VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+VG_NORM_FILESIZE=11
+#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
 
 # Write test
 W_DO_TEST=T
@@ -58,8 +58,8 @@
 RM_DIR="${RM} -rf"
 TEST_PROG="./jtest"
 CHK_PROG="./janalyze.py"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
 MAKE="make -f Makefile.rtest"
 
 




More information about the rhmessaging-commits mailing list