[rhmessaging-commits] rhmessaging commits: r996 - in store/trunk/cpp: lib/jrnl and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Wed Oct 10 10:38:10 EDT 2007


Author: kpvdr
Date: 2007-10-10 10:38:09 -0400 (Wed, 10 Oct 2007)
New Revision: 996

Modified:
   store/trunk/cpp/lib/JournalImpl.cpp
   store/trunk/cpp/lib/JournalImpl.h
   store/trunk/cpp/lib/jrnl/data_tok.hpp
   store/trunk/cpp/lib/jrnl/enq_map.cpp
   store/trunk/cpp/lib/jrnl/enq_map.hpp
   store/trunk/cpp/lib/jrnl/jcntl.cpp
   store/trunk/cpp/lib/jrnl/jcntl.hpp
   store/trunk/cpp/lib/jrnl/rmgr.cpp
   store/trunk/cpp/lib/jrnl/rrfc.hpp
   store/trunk/cpp/lib/jrnl/txn_map.cpp
   store/trunk/cpp/lib/jrnl/txn_map.hpp
   store/trunk/cpp/lib/jrnl/wmgr.cpp
   store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
   store/trunk/cpp/tests/jrnl/rtest
Log:
Added transaction handling. Only recover now remains incomplete to get transactions done.

Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/JournalImpl.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -33,3 +33,16 @@
 
 JournalImpl::~JournalImpl()
 {}
+
+void
+JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
+        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
+        boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (journal::jexception)
+{
+    // Create list of prepared xids
+    std::vector<std::string> prep_xid_list;
+    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+            i != prep_tx_list.end(); i++)
+        prep_xid_list.push_back(i->xid);
+    journal::jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+}

Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/JournalImpl.h	2007-10-10 14:38:09 UTC (rev 996)
@@ -25,6 +25,9 @@
 #define _JournalImpl_
 
 #include "jrnl/jcntl.hpp"
+#include "jrnl/data_tok.hpp"
+#include "PreparedTransaction.h"
+#include <boost/ptr_container/ptr_list.hpp>
 
 namespace rhm {
     namespace bdbstore {
@@ -36,6 +39,17 @@
                         const std::string& journalDirectory,
                         const std::string& journalBaseFilename);
             ~JournalImpl();
+            void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
+			        std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
+                    boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+                    throw (journal::jexception);
+
+            void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
+                    throw (journal::jexception)
+            {
+                recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
+                    &aio_wr_callback, prep_tx_list);
+            }
         };
 
         } // namespace bdbstore

Modified: store/trunk/cpp/lib/jrnl/data_tok.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/data_tok.hpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -108,6 +108,7 @@
         size_t      _dsize;         ///< Data size in bytes
         u_int32_t   _dblks_written; ///< Data blocks read/written
         u_int32_t   _dblks_read;    ///< Data blocks read/written
+        u_int16_t   _fid;           ///< FID containing header of enqueue record
         u_int64_t   _rid;           ///< RID of data set by enqueue operation
         std::string _xid;           ///< XID set by enqueue operation
         u_int64_t   _dequeue_rid;   ///< RID of data set by dequeue operation
@@ -146,6 +147,8 @@
         inline void incr_dblocks_read(u_int32_t dblks_read) { _dblks_read += dblks_read; }
         inline void set_dblocks_read(u_int32_t dblks_read) { _dblks_read = dblks_read; }
 
+        inline const u_int16_t fid() const { return _fid; }
+        inline void set_fid(const u_int16_t fid) { _fid = fid; }
         inline const u_int64_t rid() const { return _rid; }
         inline void set_rid(const u_int64_t rid) { _rid = rid; }
         inline const u_int64_t dequeue_rid() const throw (jexception) {return _dequeue_rid; }

Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -120,7 +120,7 @@
 }
 
 void
-enq_map::lock(const u_int64_t rid)
+enq_map::lock(const u_int64_t rid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     emap_itr itr = _map.find(rid);
@@ -136,7 +136,7 @@
 }
 
 void
-enq_map::unlock(const u_int64_t rid)
+enq_map::unlock(const u_int64_t rid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     emap_itr itr = _map.find(rid);
@@ -152,7 +152,7 @@
 }
 
 const bool
-enq_map::is_locked(const u_int64_t rid)
+enq_map::is_locked(const u_int64_t rid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     emap_itr itr = _map.find(rid);

Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -75,9 +75,9 @@
         void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception);
         const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
         const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
-        void lock(const u_int64_t rid);
-        void unlock(const u_int64_t rid);
-        const bool is_locked(const u_int64_t rid);
+        void lock(const u_int64_t rid) throw (jexception);
+        void unlock(const u_int64_t rid) throw (jexception);
+        const bool is_locked(const u_int64_t rid) throw (jexception);
         inline void clear() { _map.clear(); }
         inline const bool empty() const { return _map.empty(); }
         inline const u_int16_t size() const { return (u_int16_t)_map.size(); }

Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -123,21 +123,14 @@
 
 void
 jcntl::recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb, std::deque<data_tok*>* wr_dtokl,
-        const aio_cb wr_cb, boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
-        throw (jexception)
+        const aio_cb wr_cb, const std::vector<std::string>& prep_txn_list) throw (jexception)
 {
-    // Create list of prepared xids
-    std::set<std::string> prep_xid_list;
-    for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
-            i != prep_tx_list.end(); i++);
-// TODO!
-//        prep_xid_list.insert(i->??);
-    
     // Verify journal dir and journal files
     _jdir.verify_dir();
     _rcvdat.reset();
     _emap.clear();
-    rcvr_janalyze(_rcvdat);
+    _tmap.clear();
+    rcvr_janalyze(_rcvdat, prep_txn_list);
 
     if (_datafh)
     {
@@ -354,7 +347,7 @@
 }
 
 void
-jcntl::rcvr_janalyze(rcvdat& rd) throw (jexception)
+jcntl::rcvr_janalyze(rcvdat& rd, const std::vector<std::string>& prep_txn_list) throw (jexception)
 {
     jinf ji(_jdir.dirname() + "/" + _base_filename + "." + JRNL_INFO_EXTENSION, true);
     try
@@ -367,7 +360,6 @@
         if (e.err_code() != jerrno::JERR_JINF_JDATEMPTY)
             throw e;
     }
-//std::cout << "f" << rd._ffid << (rd._empty?"e":"") << " ";
 
     // Restore all read and write pointers
     if (!rd._empty)
@@ -376,13 +368,14 @@
         for (u_int16_t fnum=0; fnum<JRNL_NUM_FILES && !eoj; fnum++)
         {
             u_int16_t fid = (fnum + rd._ffid) % JRNL_NUM_FILES;
-            eoj = rcvr_fanalyze(fid, rd);
+            eoj = rcvr_fanalyze(fid, rd, prep_txn_list);
         }
     }
 }
 
 const bool
-jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd) throw (jexception)
+jcntl::rcvr_fanalyze(u_int16_t fid, rcvdat& rd, const std::vector<std::string>& /*prep_txn_list*/)
+        throw (jexception)
 {
     bool eoj = false;
     std::stringstream ss;
@@ -466,6 +459,12 @@
                     jifs.seekg(foffs);
                 }
                 break;
+            case RHM_JDAT_TXA_MAGIC:
+//std::cout << " a";
+                break;
+            case RHM_JDAT_TXC_MAGIC:
+//std::cout << " c";
+                break;
             case RHM_JDAT_EMPTY_MAGIC:
                 {
 //std::cout << " x";

Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -49,8 +49,6 @@
 #include <jrnl/wmgr.hpp>
 #include <jrnl/wrfc.hpp>
 #include <qpid/broker/PersistableQueue.h>
-#include <PreparedTransaction.h>
-#include <boost/ptr_container/ptr_list.hpp>
 
 namespace rhm
 {
@@ -69,7 +67,7 @@
     */
     class jcntl : public qpid::broker::ExternalQueueStore 
     {
-    private:
+    protected:
         /**
         * \brief Journal ID
         *
@@ -226,7 +224,7 @@
         */
         void recover(std::deque<data_tok*>* rd_dtokl, const aio_cb rd_cb,
 			std::deque<data_tok*>* wr_dtokl, const aio_cb wr_cb,
-            boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (jexception);
+            const std::vector<std::string>& prep_txn_list) throw (jexception);
 
         /**
         * \brief Recover using internal default callbacks and data_tok lists.
@@ -235,11 +233,10 @@
         *
         * \exception TODO
         */
-        void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
-                throw (jexception)
+        void recover(const std::vector<std::string>& prep_txn_list) throw (jexception)
         {
             recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
-                    &aio_wr_callback, prep_tx_list);
+                    &aio_wr_callback, prep_txn_list);
         }
 
         /**
@@ -590,7 +587,7 @@
 
 
 
-    private:
+    protected:
         /**
         * \brief Check status of journal before allowing write operations.
         */
@@ -614,14 +611,16 @@
         /**
         * \brief Analyze journal for recovery.
         */
-        void rcvr_janalyze(rcvdat& jrs) throw (jexception);
+        void rcvr_janalyze(rcvdat& jrs, const std::vector<std::string>& prep_txn_list)
+                throw (jexception);
 
         /**
         * \brief Analyze a particular journal file for recovery.
         *
         * \return <b><i>true</i></b> if end of journal (eoj) found; <b><i>false</i></b> otherwise.
         */
-        const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs) throw (jexception);
+        const bool rcvr_fanalyze(u_int16_t fid, rcvdat& jrs,
+                const std::vector<std::string>& prep_txn_list) throw (jexception);
 
 	/**
 	* Intenal callback write

Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -295,6 +295,19 @@
                 if (is_enq) // ok, this record is enqueued, check it, then read it...
                 {
 //std::cout << "e" << std::flush;
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+                    // Is this locked by a pending dequeue transaction?
+                    try
+                    {
+                        if (_emap.is_locked(_hdr._rid))
+                            return RHM_IORES_TXPENDING;
+                    }
+                    catch (jexception e)
+                    {
+                        if (e.err_code() != jerrno::JERR_MAP_NOTFOUND)
+                            throw e;
+                    }
+#endif
                     if (dtokp->rid())
                     {
                         if (_hdr._rid != dtokp->rid())

Modified: store/trunk/cpp/lib/jrnl/rrfc.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rrfc.hpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/rrfc.hpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -104,7 +104,10 @@
         inline const int fh() const { return _curr_fh->rd_fh(); }
         inline const u_int32_t enqcnt() const { return _curr_fh->enqcnt(); }
         inline const u_int32_t incr_enqcnt() { return _curr_fh->incr_enqcnt(); }
+        inline const u_int32_t incr_enqcnt(u_int16_t fid) { return _fh_arr[fid]->incr_enqcnt(); }
         inline const u_int32_t add_enqcnt(u_int32_t a) { return _curr_fh->add_enqcnt(a); }
+        inline const u_int32_t add_enqcnt(u_int16_t fid, u_int32_t a)
+                { return _fh_arr[fid]->add_enqcnt(a); }
         inline const u_int32_t decr_enqcnt(u_int16_t fid) { return _fh_arr[fid]->decr_enqcnt(); }
         inline const u_int32_t subtr_enqcnt(u_int16_t fid, u_int32_t s)
                 { return _fh_arr[fid]->subtr_enqcnt(s); }

Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -41,6 +41,12 @@
 namespace journal
 {
 
+txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag):
+        _rid(rid),
+        _fid(fid),
+        _enq_flag(enq_flag)
+{}
+
 txn_map::txn_map():
         _map()
 {
@@ -53,25 +59,24 @@
 }
 
 void
-txn_map::insert_rid_fid(const std::string& xid, const u_int64_t rid, const u_int16_t fid)
-        throw (jexception)
+txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception)
 {
-    rid_fid_pair rec(rid, fid);
     pthread_mutex_lock(&_mutex);
     xmap_itr itr = _map.find(xid);
     pthread_mutex_unlock(&_mutex);
     if (itr == _map.end()) // not found in map
     {
-        rid_fid_list list;
-        list.push_back(rec);
+        txn_data_list list;
+        list.push_back(td);
         std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
+        // TODO: check for failure here?
     }
     else
-        itr->second.push_back(rec);
+        itr->second.push_back(td);
 }
 
-const txn_map::rid_fid_list
-txn_map::get_rid_fid_list(const std::string& xid) throw (jexception)
+const txn_data_list
+txn_map::get_tdata_list(const std::string& xid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     xmap_itr itr = _map.find(xid);
@@ -85,8 +90,8 @@
     return itr->second;
 }
 
-const txn_map::rid_fid_list
-txn_map::get_remove_rid_fid_list(const std::string& xid) throw (jexception)
+const txn_data_list
+txn_map::get_remove_tdata_list(const std::string& xid) throw (jexception)
 {
     pthread_mutex_lock(&_mutex);
     xmap_itr itr = _map.find(xid);
@@ -97,7 +102,7 @@
         ss << std::hex << "xid=\"" << xid << "\"";
         throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_remove_fid");
     }
-    rid_fid_list list = itr->second;
+    txn_data_list list = itr->second;
     _map.erase(itr);
     pthread_mutex_unlock(&_mutex);
     return list;

Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -42,7 +42,6 @@
 }
 
 #include <map>
-#include <list>
 #include <pthread.h>
 #include <vector>
 #include <jrnl/jexception.hpp>
@@ -52,15 +51,22 @@
 namespace journal
 {
 
+    struct txn_data_struct
+    {
+        u_int64_t _rid;
+        u_int16_t _fid;
+        bool _enq_flag;
+        txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
+    };
+    typedef txn_data_struct txn_data;
+    typedef std::vector<txn_data> txn_data_list;
+    typedef txn_data_list::iterator tdl_itr;
+
     class txn_map
     {
-    public:
-        typedef std::pair<u_int64_t, u_int16_t> rid_fid_pair;
-        typedef std::list<rid_fid_pair> rid_fid_list;
-
     private:
-        typedef std::pair<std::string, rid_fid_list> xmap_param;
-        typedef std::map<std::string, rid_fid_list> xmap;
+        typedef std::pair<std::string, txn_data_list> xmap_param;
+        typedef std::map<std::string, txn_data_list> xmap;
         typedef xmap::iterator xmap_itr;
 
         xmap _map;
@@ -70,10 +76,9 @@
         txn_map();
         ~txn_map();
 
-        void insert_rid_fid(const std::string& xid, const u_int64_t rid, const u_int16_t fid)
-                throw (jexception);
-        const rid_fid_list get_rid_fid_list(const std::string& xid) throw (jexception);
-        const rid_fid_list get_remove_rid_fid_list(const std::string& xid) throw (jexception);
+        void insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
+        const txn_data_list get_tdata_list(const std::string& xid) throw (jexception);
+        const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
         const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
         inline void clear() { _map.clear(); }
         inline const bool empty() const { return _map.empty(); }

Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -141,13 +141,10 @@
         u_int32_t data_offs_dblks = dtokp->dblocks_written();
         u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
                 (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+
+        // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0)
-        {
-            _wrfc.incr_enqcnt();
-            _emap.insert_fid(rid, _wrfc.index());
-        }
-#endif
+            dtokp->set_fid(_wrfc.index());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -163,6 +160,18 @@
             // message. AIO callbacks will then only process this token when entire message is
             // enqueued.
             _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+            _wrfc.incr_enqcnt(dtokp->fid());
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+            if (xid_len) // If part of transaction, add to transaction map
+            {
+                std::string xid((char*)xid_ptr, xid_len);
+                _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), true));
+            }
+            else
+                _emap.insert_fid(rid, dtokp->fid());
+#endif
+
             done = true;
         }
 
@@ -263,13 +272,10 @@
         u_int32_t data_offs_dblks = dtokp->dblocks_written();
         u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
                 (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
-#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+
+        // Remember fid which contains the record header in case record is split over several files
         if (data_offs_dblks == 0)
-        {
-            u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
-            _wrfc.decr_enqcnt(fid);
-        }
-#endif
+            dtokp->set_fid(_wrfc.index());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -280,6 +286,23 @@
             // TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
             dtokp->set_wstate(data_tok::DEQ_SUBM);
             _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+            if (xid_len) // If part of transaction, add to transaction map
+            {
+                // If the enqueue is part of a pending txn, it will not yet be in emap
+                try { _emap.lock(rid); }
+                catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+                std::string xid((char*)xid_ptr, xid_len);
+                _tmap.insert_txn_data(xid, txn_data(rid, dtokp->fid(), false));
+            }
+            else
+            {
+                u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+                _wrfc.decr_enqcnt(fid);
+            }
+#endif
+
             done = true;
         }
 
@@ -377,6 +400,10 @@
         u_int32_t data_offs_dblks = dtokp->dblocks_written();
         u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
                 (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+
+        // Remember fid which contains the record header in case record is split over several files
+        if (data_offs_dblks == 0)
+            dtokp->set_fid(_wrfc.index());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -386,6 +413,20 @@
         {
             dtokp->set_wstate(data_tok::ABORT_SUBM);
             _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+            // Delete this txn from tmap, unlock any locked records in emap
+            std::string xid((char*)xid_ptr, xid_len);
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            {
+                try { _emap.unlock(itr->_rid); }
+                catch(jexception e) { if (e.err_code() != jerrno::JERR_MAP_NOTFOUND) throw e; }
+                if (itr->_enq_flag)
+                    _wrfc.decr_enqcnt(itr->_fid);
+            }
+#endif
+
             done = true;
         }
 
@@ -483,6 +524,10 @@
         u_int32_t data_offs_dblks = dtokp->dblocks_written();
         u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
                 (JRNL_WMGR_PAGE_SIZE * JRNL_SBLK_SIZE) - _pg_offset_dblks);
+
+        // Remember fid which contains the record header in case record is split over several files
+        if (data_offs_dblks == 0)
+            dtokp->set_fid(_wrfc.index());
         _pg_offset_dblks += ret;
         _cached_offset_dblks += ret;
         dtokp->incr_dblocks_written(ret);
@@ -492,6 +537,23 @@
         {
             dtokp->set_wstate(data_tok::COMMIT_SUBM);
             _page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
+
+#if !(defined(RHM_WRONLY) || defined(RHM_RDONLY))
+            // Delete this txn from tmap, process records into emap
+            std::string xid((char*)xid_ptr, xid_len);
+            txn_data_list tdl = _tmap.get_remove_tdata_list(xid);
+            for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+            {
+                if (itr->_enq_flag) // txn enqueue
+                    _emap.insert_fid(itr->_rid, itr->_fid);
+                else // txn dequeue
+                {
+                    u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+                    _wrfc.decr_enqcnt(fid);
+                }
+            }
+#endif
+
             done = true;
         }
 

Modified: store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/tests/jrnl/JournalSystemTests.cpp	2007-10-10 14:38:09 UTC (rev 996)
@@ -33,10 +33,8 @@
 #include "msg_producer.hpp"
 #include "msg_consumer.hpp"
 #include "jtest.hpp"
+#include <vector>
 
-#include <PreparedTransaction.h>
-#include <boost/ptr_container/ptr_list.hpp>
-
 #define NUM_MSGS 5
 #define MAX_AIO_SLEEPS 500
 #define AIO_SLEEP_TIME 1000
@@ -161,7 +159,7 @@
 
     void EmptyRecoverTest()
     {
-        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+        std::vector<std::string> txn_list;
         //Stack
         char* test_name = "EmptyRecoverTest_Stack";
         try
@@ -264,7 +262,7 @@
 
     void RecoverReadTest()
     {
-        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+        std::vector<std::string> txn_list;
         //Stack
         char* test_name = "RecoverReadTest_Stack";
         try
@@ -334,7 +332,7 @@
 
     void RecoveredReadTest()
     {
-        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+        std::vector<std::string> txn_list;
         //Stack
         char* test_name = "RecoveredReadTest_Stack";
         try
@@ -419,7 +417,7 @@
 
     void RecoveredDequeueTest()
     {
-        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+        std::vector<std::string> txn_list;
         //Stack
         char* test_name = "RecoveredDequeueTest_Stack";
         try
@@ -508,7 +506,7 @@
 
     void ComplexRecoveryTest1()
     {
-        boost::ptr_list<rhm::bdbstore::PreparedTransaction> txn_list;
+        std::vector<std::string> txn_list;
         //Stack
         char* test_name = "ComplexRecoveryTest1_Stack";
         try

Modified: store/trunk/cpp/tests/jrnl/rtest
===================================================================
--- store/trunk/cpp/tests/jrnl/rtest	2007-10-10 13:59:38 UTC (rev 995)
+++ store/trunk/cpp/tests/jrnl/rtest	2007-10-10 14:38:09 UTC (rev 996)
@@ -30,8 +30,8 @@
 
 NUM_JFILES=8
 VG_ITERATIONS=1
-VG_NORM_FILESIZE=11
-#VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
+#VG_NORM_FILESIZE=11
+VG_NORM_FILESIZE=18 # RHEL5 triggers extra valgrind messages when pthreads are in use
 
 # Write test
 W_DO_TEST=T
@@ -58,8 +58,8 @@
 RM_DIR="${RM} -rf"
 TEST_PROG="./jtest"
 CHK_PROG="./janalyze.py"
-#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
-VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
+VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes"
+#VALGRIND="valgrind -q --track-fds=yes --leak-check=full --leak-resolution=high --show-reachable=yes --suppressions=/usr/lib/valgrind/glibc-2.5.supp"
 MAKE="make -f Makefile.rtest"
 
 




More information about the rhmessaging-commits mailing list